如何在node.js中将数组值排列为可读stream?

从数组和pipe道值创build可读stream的最佳方法是什么? 我已经看到了使用setInterval的substack的例子 ,我可以实现成功使用0的间隔值,但我迭代了大量的数据和触发gc每次放缓东西。

// Working with the setInterval wrapper var arr = [1, 5, 3, 6, 8, 9]; function createStream () { var t = new stream; t.readable = true; var times = 0; var iv = setInterval(function () { t.emit('data', arr[times]); if (++times === arr.length) { t.emit('end'); clearInterval(iv); } } }, 0); // Create the writable stream s // .... createStream().pipe(s); 

我想要做的是发出没有setInterval的值。 也许使用这样的asynchronous模块:

 async.forEachSeries(arr, function(item, cb) { t.emit('data', item); cb(); }, function(err) { if (err) { console.log(err); } t.emit('end'); }); 

在这种情况下,我迭代数组,并发出数据,但从来没有pipe任何值。 我已经看过shinout的ArrayStream ,但是我认为它是在v0.10之前创build的,比我想要的要多一点。

您可以通过创build可读的stream并将值推入其中来解决此问题。

stream是一种痛苦,但直接使用它们往往比使用库更容易 。

要传输的string或缓冲区的数组

如果你正在处理一个string或缓冲区的数组,这将工作:

 'use strict' const Stream = require('stream') const readable = new Stream.Readable() readable.pipe(process.stdout) const items = ['a', 'b', 'c'] items.forEach(item => readable.push(item)) // no more data readable.push(null) 

笔记:

  • readable.pipe(process.stdout)做两件事情:把stream放入“stream动”模式,并设置process.stdout可写stream从接收readable数据
  • Readable#push方法用于可读stream的创build者,而不是stream消费者。
  • 您必须执行Readable#push(null)来表示没有更多的数据。

要传输的非string数组

要从一个既不是string也不是缓冲区的数组中创build一个stream,需要可读stream和可写stream都处于“对象模式” 。 在下面的例子中,我做了以下更改:

  • {objectMode: true}初始化可读stream
  • 而不是pipe道到process.stdout ,pipe道到一个简单的可写入的对象模式的stream。

      'use strict' const Stream = require('stream') const readable = new Stream.Readable({objectMode: true}) const writable = new Stream.Writable({objectMode: true}) writable._write = (object, encoding, done) => { console.log(object) // ready to process the next chunk done() } readable.pipe(writable) const items = [1, 2, 3] items.forEach(item => readable.push(item)) // end the stream readable.push(null) 

性能说明

数据从哪里来? 如果它是stream式数据源,则最好使用变换stream来操作stream,而不是将其转换为数组。

这是一个古老的问题,但如果有人绊倒在这, node-stream-array是一个更简单,更优雅的实现Node.js> = v0.10

 var streamify = require('stream-array'), os = require('os'); streamify(['1', '2', '3', os.EOL]).pipe(process.stdout); 

我为此使用ArrayStream结束 。 它确实解决了GC被触发的问题。 我得到了recursionprocess.nextTick的警告,所以修改了ArrayStream中的nextTickcallbacksetImmediate,并修复了警告,似乎运作良好。