文件收集和聚合结果报告的操作与非阻塞IO

我想对任意大的文件进行一些任意的昂贵的工作。 我想实时报告进度,然后在所有文件处理完毕后显示结果。 如果没有与我的expression式匹配的文件,我想抛出一个错误。

想象一下,编写一个testing框架,加载所有testing文件,执行它们(不按特定顺序),实时报告进度,然后在所有testing完成后显示聚合结果。

用阻塞语言(比如Ruby)编写这个代码非常简单。

事实certificate,在节点中执行这个看似简单的任务时遇到了问题,同时也真正利用了基于事件的asynchronousIO。

我的第一个devise是连续执行每一步。

  1. 加载所有的文件,创build一个文件集合进行处理
  2. 处理集合中的每个文件
  3. 在处理完所有文件后报告结果

这种方法确实有效,但对我来说看起来并不完全正确,因为它导致我的程序中计算量更大的部分等待所有的文件IO完成。 Node是不是被devise来避免的那种等待?

我的第二个devise是处理每个文件,因为它是在磁盘上asynchronous发现的。 为了争辩,让我们设想一下这样的方法:

eachFileMatching(path, expression, callback) { // recursively, asynchronously traverse the file system, // calling callback every time a file name matches expression. } 

而这种方法的消费者看起来像这样:

 eachFileMatching('test/', /_test.js/, function(err, testFile) { // read and process the content of testFile }); 

虽然这个devise感觉像是一个非常“节点”的IO工作方式,它有两个主要问题(至less在我大概是错误的实现):

  1. 我不知道什么时候所有的文件都被处理了,所以我不知道什么时候汇编和发布结果。
  2. 由于文件读取是非阻塞的,recursion的,我正在努力如何知道是否没有find文件。

我希望我只是在做一些错误的事情,并且有一些合理简单的策略,其他人用第二种方法工作。

尽pipe这个例子使用了一个testing框架,但是我还有许多其他项目碰到这个完全相同的问题,而且我想象任何人写一个相当复杂的应用程序来访问节点中的文件系统也是一样。

“读取和处理testFile的内容”是什么意思?

我不明白为什么你不知道什么时候处理所有的文件。 你不使用Streams吗? 一个stream有几个事件,不只是data 。 如果您处理end事件,那么您将知道每个文件何时完成。

例如,你可能有一个文件名list ,设置每个文件的处理,然后当你得到一个end事件时,从列表中删除文件名。 当清单是空的,你完成了。 或者创build一个包含名称和完成状态的FileName对象。 当你得到一个end事件时,改变状态并减less一个文件名计数器。 当计数器变为零时,您完成了,或者如果您不确定,则可以扫描所有的FileName对象以确保其状态已完成。

您可能还有一个定时器定期检查计数器,如果它在一段时间内没有改变,请报告处理可能卡住状态未完成的FileName对象。

…我刚刚在另一个问题中遇到了这种情况,接受的答案(加上github链接)解释得很好。 检查事件驱动代码循环吗?

事实certificate,我所能build立的最小的工作解决scheme比我所希望的要复杂得多。

以下是适合我的代码。 它可能会被清理,或在这里和那里稍微更易读,而且我对这样的反馈不感兴趣。

如果解决这个问题有一个非常不同的方法,那就更简单了,或者说效率更高,我对这个问题很感兴趣。 我真的很惊讶,看似简单的需求的解决scheme需要大量的代码,但也许这就是为什么有人发明阻塞io?

复杂性实际上是为了满足以下所有要求:

  • 处理find的文件
  • 知道什么时候search完成
  • 知道是否没有find文件

代码如下:

 /** * Call fileHandler with the file name and file Stat for each file found inside * of the provided directory. * * Call the optionally provided completeHandler with an array of files (mingled * with directories) and an array of Stat objects (one for each of the found * files. * * Following is an example of a simple usage: * * eachFileOrDirectory('test/', function(err, file, stat) { * if (err) throw err; * if (!stat.isDirectory()) { * console.log(">> Found file: " + file); * } * }); * * Following is an example that waits for all files and directories to be * scanned and then uses the entire result to do something: * * eachFileOrDirectory('test/', null, function(files, stats) { * if (err) throw err; * var len = files.length; * for (var i = 0; i < len; i++) { * if (!stats[i].isDirectory()) { * console.log(">> Found file: " + files[i]); * } * } * }); */ var eachFileOrDirectory = function(directory, fileHandler, completeHandler) { var filesToCheck = 0; var checkedFiles = []; var checkedStats = []; directory = (directory) ? directory : './'; var fullFilePath = function(dir, file) { return dir.replace(/\/$/, '') + '/' + file; }; var checkComplete = function() { if (filesToCheck == 0 && completeHandler) { completeHandler(null, checkedFiles, checkedStats); } }; var onFileOrDirectory = function(fileOrDirectory) { filesToCheck++; fs.stat(fileOrDirectory, function(err, stat) { filesToCheck--; if (err) return fileHandler(err); checkedFiles.push(fileOrDirectory); checkedStats.push(stat); fileHandler(null, fileOrDirectory, stat); if (stat.isDirectory()) { onDirectory(fileOrDirectory); } checkComplete(); }); }; var onDirectory = function(dir) { filesToCheck++; fs.readdir(dir, function(err, files) { filesToCheck--; if (err) return fileHandler(err); files.forEach(function(file, index) { file = fullFilePath(dir, file); onFileOrDirectory(file); }); checkComplete(); }); } onFileOrDirectory(directory); }; 

这样做的两种方式,首先可能被认为是连续的

 var files = []; doFile(files, oncomplete); function doFile(files, oncomplete) { if (files.length === 0) return oncomplete(); var f = files.pop(); processFile(f, function(err) { // Handle error if any doFile(files, oncomplete); // Recurse }); }; function processFile(file, callback) { // Do whatever you want to do and once // done call the callback ... callback(); }; 

第二种方式,我们把它叫做并行类似于summin:

 var files = []; doFiles(files, oncomplete); function doFiles(files, oncomplete) { var exp = files.length; var done = 0; for (var i = 0; i < exp; i++) { processFile(files[i], function(err) { // Handle errors (but still need to increment counter) if (++done === exp) return oncomplete(); }); } }; function processFile(file, callback) { // Do whatever you want to do and once // done call the callback ... callback(); }; 

现在看起来很明显,你应该使用第二种方法,但是你会发现对于IO密集型操作,在并行化时你并没有真正获得任何性能上的提升。 第一种方法的一个缺点是recursion可能会吹出你的堆栈跟踪。

TNX

圭多