Node.jsstream可以作为协程?

有没有办法使Node.jsstream作为协同程序。

例如斐波那契数字stream。

fibonacci.on('data', cb); //The callback (cb) is like function cb(data) { //something done with data here ... } 

期望

 function* fibonacciGenerator() { fibonacci.on('data', cb); //Don't know what has to be done further... }; var fibGen = fibonacciGenerator(); fibGen.next().value(cb); fibGen.next().value(cb); fibGen.next().value(cb); . . . 

从发生器获取所需的数字。 这里斐波那契数列仅仅是一个例子,实际上这个数据stream可以是任何文件,mongodb查询结果等等。

也许这样的事情

  1. 将“stream.on”函数作为生成器。
  2. 将产量放在callback函数中。
  3. 获取生成器对象。
  4. 接下来调用并获取stream中的下一个值。

如果是的话,至less是可能的,如果不是,为什么? 也许一个愚蠢的问题:)

如果你不想使用一个转译器(比如Babel),或者等到async / await使用Node.js,你可以使用生成器和promise来实现它。

缺点是你的代码必须存在于一个生成器中。



首先,您可以创build一个接收stream的帮助器,并返回一个函数,该函数在调用时返回stream( data )的下一个“事件”的承诺。

 function streamToPromises(stream) { return function() { if (stream.isPaused()) { stream.resume(); } return new Promise(function(resolve) { stream.once('data', function() { resolve.apply(stream, arguments); stream.pause(); }); }); } } 

当你不使用它时暂停stream,当你问下一个值时恢复它。


接下来,你有一个帮助器,它接收一个生成器作为参数,每当它产生一个承诺时,它就parsing它并将其结果传回给生成器。

 function run(fn) { var gen = fn(); var promise = gen.next().value; var tick = function() { promise.then(function() { promise = gen.next.apply(gen, arguments).value; }).catch(function(err) { // TODO: Handle error. }).then(function() { tick(); }); } tick(); } 

最后,你会在一个生成器中做你自己的逻辑,并用run helper运行它,就像这样:

 run(function*() { var nextFib = streamToPromises(fibonacci); var n; n = yield nextFib(); console.log(n); n = yield nextFib(); console.log(n); }); 
  • 你自己的发电机将产生承诺,暂停执行并将控制权交给runfunction。
  • runfunction将解决承诺,并将其值返回给您自己的发电机。

这是它的要点。 您需要修改streamToPromises以检查其他事件(例如enderror )。

 class FibonacciGeneratorReader extends Readable { _isDone = false; _fibCount = null; _gen = function *() { let prev = 0, curr = 1, count = 1; while (this._fibCount === -1 || count++ < this._fibCount) { yield curr; [prev, curr] = [curr, prev + curr]; } return curr; }.bind(this)(); constructor(fibCount) { super({ objectMode: true, read: size => { if (this._isDone) { this.push(null); } else { let fib = this._gen.next(); this._isDone = fib.done; this.push(fib.value.toString() + '\n'); } } }); this._fibCount = fibCount || -1; } } new FibonacciGeneratorReader(10).pipe(process.stdout); 

输出应该是:

  1
 1
 2
 3
五
 8
 13
 21
 34
 55