Node.JS – 需要帮助理解本地stream

我试图了解stream的本地实现如何工作。 代码如下:

const Stream = require('stream'); // define a custom class to read my data into the stream class SourceWrapper extends Stream.Readable { constructor(opt, content) { super(opt); this.content = content; this.len = content.length; this.index = 0; } _read() { let i = this.index++; if (i >= this.len) this.push(null); else { this.push(this.content[i]); } } } // generate some data const arr = (new Array(10000000)).fill(1); // declare the streams const firstStream = new SourceWrapper({objectMode: true}, arr); const transform = (x, enc, next) => next(undefined, x * Math.random(x, 10)); const firstMapStream = new Stream.Transform({objectMode: true}); firstMapStream._transform = transform; const secondMapStream = new Stream.Transform({objectMode: true}); secondMapStream._transform = transform; // create a promise to measure execution time const start = new Date(); new Promise((resolve, reject) => { firstStream .pipe(firstMapStream) .pipe(secondMapStream) .on('finish', () => resolve(new Date())); }) .then((end) => console.log('execTime', end - start)); 

问题在于它在小数据集(即[1,2,3,4] )上工作,但在大集合上运行后似乎很快终止。

我错过了什么? 与objectMode什么关系?

感谢任何帮助。

原因是有人应该使用绑定data事件监听器从stream中读取数据。 我已经重写了您的代码,以清楚了解这个问题。 我也修正了跳过零指数的错误指标。

 'use strict'; const Stream = require('stream'); // define a custom class to read my data into the stream class SourceWrapper extends Stream.Readable { constructor(opt, content) { super(opt); this.content = content; this.len = content.length; this.index = 0; } _read() { let i = this.index; if (i >= this.len) { this.push(null); } else { this.push(this.content[i]); } this.index++; } } const transform = (x, enc, next) => next(undefined, x * Math.random(x, 10)); const transform1 = new Stream.Transform({objectMode: true}); transform1._transform = transform; const transform2 = new Stream.Transform({objectMode: true}); transform2._transform = transform; const write = new Stream.Writable({ objectMode: true, write(value, enc, next) { // Do something like writing... next(); } }); // generate some data const arr = (new Array(1000000)).fill(1); const read = new SourceWrapper({objectMode: true}, arr); new Promise((resolve, reject) => { read .pipe(transform1) .pipe(transform2) .pipe(write) .on('finish', () => { resolve(); }); }) .then(() => { console.log('Done'); });