如何确保在stream完成处理后执行asynchronous代码?
我有一个stream,通过侦听data
, error
和end
事件来处理,我调用一个函数来处理第一个stream中的每个data
事件。 当然,处理数据的函数会调用其他callback函数,使得它是asynchronous的。 那么当stream中的数据被处理时,我该如何开始执行更多的代码呢? 在stream中收听end
事件并不意味着asynchronousdata
处理function已经完成。
当我执行下一个语句时,如何确保stream数据处理函数完成?
这里是一个例子:
function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { var self = this; var promises = []; accountStream .on('data', function (account) { migrateAccount.bind(self)(account, finishMigration); }) .on('error', function (err) { return console.log(err); }) .on('end', function () { console.log("Finished updating account stream (but finishMigration is still running!!!)"); callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running! }); } var migrateAccount = function (oldAccount, callback) { executeSomeAction(oldAccount, function(err, newAccount) { if (err) return console.log("error received:", err); return callback(newAccount); }); } var finishMigration = function (newAccount) { // some code that is executed asynchronously... }
如何确保callThisOnlyAfterAllAccountsAreMigrated
在处理完stream之后被调用?
这可以用承诺来完成吗? 它可以通过stream? 我正在使用Nodejs,所以引用其他npm模块可能会有所帮助。
正如你所说,在stream上听end
事件是无用的。 数据stream不知道或不在乎data
处理程序中的data
,因此您需要编写一些代码来跟踪自己的migrateAccount状态。
如果是我,我会重写这整个部分。 如果在stream中使用带有.read()
的readable
事件, .read()
可以一次读取尽可能多的项目。 如果那是一个,没问题。 如果是30,太好了。 你这样做的原因是,你不会在数据stream中的数据上工作。 现在,如果accountStream速度很快,您的应用程序无疑会在某个时候崩溃。
当你从一个stream中读取一个项目并开始工作时,承诺你回来(使用蓝鸟或类似的),并将其放入一个数组。 承诺解决后,将其从数组中删除。 当stream结束时,将一个.done()
处理程序附加到.all()
(基本上在数组中的每个promise中都有一个大的承诺)。
你也可以使用一个简单的计数器进行工作。
使用通过stream(npm 通过2模块),我解决了这个问题,使用下面的代码来控制asynchronous行为:
var through = require('through2').obj; function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { var self = this; var promises = []; accountStream.pipe(through(function(account, _, next) { migrateAccount.bind(self)(account, finishMigration, next); })) .on('data', function (account) { }) .on('error', function (err) { return console.log(err); }) .on('end', function () { console.log("Finished updating account stream"); callThisOnlyAfterAllAccountsAreMigrated(); }); } var migrateAccount = function (oldAccount, callback, next) { executeSomeAction(oldAccount, function(err, newAccount) { if (err) return console.log("error received:", err); return callback(newAccount, next); }); } var finishMigration = function (newAccount, next) { // some code that is executed asynchronously, but using 'next' callback when migration is finished... }
当你通过承诺处理stream时,它会容易得多。
从这里复制一个使用spex库的例子:
var spex = require('spex')(Promise); var fs = require('fs'); var rs = fs.createReadStream('values.txt'); function receiver(index, data, delay) { return new Promise(function (resolve) { console.log("RECEIVED:", index, data, delay); resolve(); // ok to read the next data; }); } spex.stream.read(rs, receiver) .then(function (data) { // streaming successfully finished; console.log("DATA:", data); }, function (reason) { // streaming has failed; console.log("REASON:", reason); });