使用asynchronous处理高地stream块

我正在使用highland.js来处理文件,使用stream来读取两个分隔符之间的内容。 我还使用async.js按顺序运行一系列http请求。

理想情况下,我想将来自高地的输出x作为第一个函数传递给async系列(链),以便为从stream中提取的每个块执行HTTP请求。

这可能吗? 如果是这样,这怎么能实现呢?

 var async = require('async'); var _ = require('highland'); _(fs.createReadStream(files[0], { encoding: 'utf8' })) .splitBy('-----BEGIN-----\n') .splitBy('\n-----END-----\n') .filter(chunk => chunk !== '') .each(function (x) { }).done(function () { async.series([ function(callback) { setTimeout(function() { console.log('Task 1'); callback(null, 1); }, 300); }, function(callback) { setTimeout(function() { console.log('Task 2'); callback(null, 2); }, 200); }, ], function(error, results) { console.log(results); }); });; 

你可以摆脱对eachdone的电话。 过滤后,您可以使用.toArray(callback)进行跟踪。 callback传递一个包含高地结果的数组。 你可能会这样重构

 var Q = require('q'); var _ = require('highland'); _(fs.createReadStream(files[0], { encoding: 'utf8' })) .splitBy('-----BEGIN-----\n') .splitBy('\n-----END-----\n') .filter(chunk => chunk !== '') .each(asyncTasks); function asyncTasks(x) { // here, x will be each of the results from highland async.series([ // do something with x results function(callback) { console.log('Task 1'); callback(null, 1); }, // do something else with x results function(callback) { console.log('Task 2'); callback(null, 2); }, ], function(error, results) { console.log(results); }); } 

这里是toArray的文档链接。 toArray消耗stream,就像done一样。 如果您有任何问题,请告诉我。

虽然老实说,我认为你会更好地使用承诺。 虽然它的一部分只是个人喜好,部分原因是它使代码更具可读性。 从我读到的内容来看,asynchronous比承诺更高效,但承诺的好处在于可以将结果从一个函数传递到另一个函数。 所以在你的例子中,你可以在第一部分做一些x工作,然后把修改后的结果传给下一个函数,以及下一个函数等等。 当你使用async.series ,你通过调用callback(null, result)完成每一个函数,直到你完成了系列的最后async.series ,当你得到所有的结果调用callback 。 现在,您可以随时将结果保存到async.series之外的某个variables,但这会使代码变得更加混乱。 如果你想用诺言重写它,那看起来如下。 我在这里使用q ,但它只是你可以使用的许多承诺库中的一个。

  var async = require('async'); var _ = require('highland'); _(fs.createReadStream(files[0], { encoding: 'utf8' })) .splitBy('-----BEGIN-----\n') .splitBy('\n-----END-----\n') .filter(chunk => chunk !== '') .each(asyncTasks); function asyncTasks(x) { // here, x will be an array of the results from highland return asyncTask1(x) .then(asyncTask2) .then(asyncTask3) } function asyncTask1(x) { var deferred = Q.defer(); // do some stuff if (// some error condition) { deferred.reject(); } else { deferred.resolve(x); // or pass along some modified version of x } return deferred.promise; } function asyncTask2(x) { // same structure as above } function asyncTask3(x) { // same structure as above } 

一些asynchronous的APIs现在已经开始返回承诺,除了接受callback,有时代替。 所以这将是一件好事情,以适应。 承诺是超级有用的。 你可以阅读更多关于他们在这里和这里 。