Node.js:分割n部分的stream内容

我试图了解节点stream和它们的生命周期。 所以,我想分割stream的内容为n部分。 下面的代码只是为了解释我的意图,并表明我已经自己尝试了一些东西。 我省略了一些细节

我有一个stream只是产生一些数据(只是一个数字序列):

class Stream extends Readable { constructor() { super({objectMode: true, highWaterMark: 1}) this.counter = 0 } _read(size) { if(this.counter === 30) { this.push(null) } else { this.push(this.counter) } this.counter += 1 } } const stream = new Stream() stream.pause(); 

一个函数,试图采取下一个块:

 function take(stream, count) { const result = [] return new Promise(function(resolve) { stream.once('readable', function() { var chunk; do { chunk = stream.read() if (_.isNull(chunk) || result.length > count) { stream.pause() break } result.push(chunk) } while(true) resolve(result) }) }) } 

并想要像这样使用它:

 take(stream, 3) .then(res => { assert.deepEqual(res, [1, 2, 3]) return take(stream, 3) }) .then(res => { assert.deepEqual(res, [4, 5, 6]) }) 

什么是地道的方式来做到这一点?

使用ReadableStream可以使用单个函数来检查当前数据块的元素是否等于预期的结果。

创buildvariablesCHUNKN ,其中CHUNK是从原始数组切片或拼接的元素数量, N是在pull()调用中的每个.enqueue()调用中由CHUNK递增的variables。

 const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []]; let N = 0; const stream = new ReadableStream({ pull(controller) { if (N < data.length) // slice `N, N += CHUNK` elements from `data` controller.enqueue(data.slice(N, N += CHUNK)) else // if `N` is equal to `data.length` call `.close()` on stream controller.close() } }); const reader = stream.getReader(); const processData = ({value, done}) => { // if stream is closed return `result`; `reader.closed` returns a `Promise` if (done) return reader.closed.then(() => result); if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) { console.log(`N: ${N}, value: [${value}]`) result.push(...value); return reader.read().then(data => processData(data)) } } const readComplete = res => console.log(`result: [${res}]`); reader.read() .then(processData) .then(readComplete) .catch(err => console.log(err)); 

我认为这是可以帮助你的 – https://github.com/substack/stream-handbook

这是一个令人惊讶的详细的手册与各种stream场景的示例代码,我用同样的作为我自己的项目的参考,并发现它迄今为止有用! 它在/ examples中也有示例代码