如何确保在stream完成处理后执行asynchronous代码?

我有一个stream,通过侦听dataerrorend事件来处理,我调用一个函数来处理第一个stream中的每个data事件。 当然,处理数据的函数会调用其他callback函数,使得它是asynchronous的。 那么当stream中的数据被处理时,我该如何开始执行更多的代码呢? 在stream中收听end事件并不意味着asynchronousdata处理function已经完成。

当我执行下一个语句时,如何确保stream数据处理函数完成?

这里是一个例子:

 function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { var self = this; var promises = []; accountStream .on('data', function (account) { migrateAccount.bind(self)(account, finishMigration); }) .on('error', function (err) { return console.log(err); }) .on('end', function () { console.log("Finished updating account stream (but finishMigration is still running!!!)"); callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running! }); } var migrateAccount = function (oldAccount, callback) { executeSomeAction(oldAccount, function(err, newAccount) { if (err) return console.log("error received:", err); return callback(newAccount); }); } var finishMigration = function (newAccount) { // some code that is executed asynchronously... } 

如何确保callThisOnlyAfterAllAccountsAreMigrated在处理完stream之后被调用?

这可以用承诺来完成吗? 它可以通过stream? 我正在使用Nodejs,所以引用其他npm模块可能会有所帮助。

正如你所说,在stream上听end事件是无用的。 数据stream不知道或不在乎data处理程序中的data ,因此您需要编写一些代码来跟踪自己的migrateAccount状态。

如果是我,我会重写这整个部分。 如果在stream中使用带有.read()readable事件, .read()可以一次读取尽可能多的项目。 如果那是一个,没问题。 如果是30,太好了。 你这样做的原因是,你不会在数据stream中的数据上工作。 现在,如果accountStream速度很快,您的应用程序无疑会在某个时候崩溃。

当你从一个stream中读取一个项目并开始工作时,承诺你回来(使用蓝鸟或类似的),并将其放入一个数组。 承诺解决后,将其从数组中删除。 当stream结束时,将一个.done()处理程序附加到.all() (基本上在数组中的每个promise中都有一个大的承诺)。

你也可以使用一个简单的计数器进行工作。

使用通过stream(npm 通过2模块),我解决了这个问题,使用下面的代码来控制asynchronous行为:

 var through = require('through2').obj; function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { var self = this; var promises = []; accountStream.pipe(through(function(account, _, next) { migrateAccount.bind(self)(account, finishMigration, next); })) .on('data', function (account) { }) .on('error', function (err) { return console.log(err); }) .on('end', function () { console.log("Finished updating account stream"); callThisOnlyAfterAllAccountsAreMigrated(); }); } var migrateAccount = function (oldAccount, callback, next) { executeSomeAction(oldAccount, function(err, newAccount) { if (err) return console.log("error received:", err); return callback(newAccount, next); }); } var finishMigration = function (newAccount, next) { // some code that is executed asynchronously, but using 'next' callback when migration is finished... } 

当你通过承诺处理stream时,它会容易得多。

从这里复制一个使用spex库的例子:

 var spex = require('spex')(Promise); var fs = require('fs'); var rs = fs.createReadStream('values.txt'); function receiver(index, data, delay) { return new Promise(function (resolve) { console.log("RECEIVED:", index, data, delay); resolve(); // ok to read the next data; }); } spex.stream.read(rs, receiver) .then(function (data) { // streaming successfully finished; console.log("DATA:", data); }, function (reason) { // streaming has failed; console.log("REASON:", reason); });