node.js如何在从stream中导入数据库后同步最终callback

这听起来像一个非常典型的用例如Qasync库,但我真的不知道什么是最好的方式来做到这一点。

我想用150行(使用node-csv )导入CSV文件,并为每行创build一个mongo文档。 然而,streamparsing看起来比“db插入”更快,所以我遇到了callback被调用得太早的问题。

 // importtest.mocha.js [...] importer.loadFromCsv (url, function(result) { result.length.should.be.equal (150); // nope, it's always around 41 } // importer.js function loadFromCsv (url, callback){ csv().from.stream(url) .on ('record', function(record, index){ new Row({data: record}).save(function() { console.log ('saved a row to db'); }); }) .on ('end', function() { callback (Row.find({})); // E parser finished, but probably not all Row.save() }); } 

所以,可以请任何人给我一个提示如何我可以修复asynchronous/承诺,以便当parsing/数据stream插入是asynchronous,最后的callback将完成后,所有插入完成?

当你插入许多logging时,你应该分别照顾每一个logging。 这是一个未经testing的代码片段,你可以尝试和适应。 实际上,你创build了一个promise列表,当它们全部被parsing后,传递给(fn)的函数被触发。 那么,正如代码中提到的那样,你应该照顾有错误的logging。 请注意,传递给(fn)的函数只有在所有的promise都被parsing(successfuly)的时候才会被执行。 为了表明对于logging的错误,你应该使用defer.reject()而不是def.resolve()。 还要为onErrorFn占位符传递一个函数。 其类似的SQL事务。

这里是包含注释的代码:

 var q = require('q'); function loadFromCsv (url, callback){ // create an array holding all promises var csv_promises = []; csv().from.stream(url) .on ('record', function(record, index){ // create new defer object, per row var row_defer = q.defer(); // make sure, this function gets called, only after the row got saved new Row({data: record}).save(function() { console.log ('saved a row to db'); // resolves the promise, per row row_defer.resolve(record); // todo: take care for an error, per row }); csv_promises.push(row_defer.promise); // add promise to promise list, per row }) .on ('end', function() { // callback (Row.find({})); // E parser finished, but probably not all Row.save() // q.all gets resolved and fires passed function as soon as ALL promises in csv_promises array are resolved // todo: take care for errors q.all(csv_promises).then(function() { callback( csv_promises ); } /*, onErrorFn */ ); } loadFromCsv( "URL", function(rows) { console.log("Treated rows: ", rows.length); });