Nodejsstream暂停(非pipe理)和恢复(pipe道)中间pipe道

我需要“暂停”一个可读的stream达几秒钟,然后重新开始。 可读stream正在被传送到一个转换stream,所以我不能使用常规的pauseresume方法,我不得不使用unpipepipe 。 在转换stream中,我能够检测到pipe事件,然后在可读stream上进行unpipe ,然后在几秒钟后,再次pipe恢复它(我希望)。

这里是代码:

main.ts

 import {Transform, Readable} from 'stream'; const alphaTransform = new class extends Transform { constructor() { super({ objectMode: true, transform: (chunk: string | Buffer, encoding: string, callback: Function) => { let transformed: IterableIterator<string>; if (Buffer.isBuffer(chunk)) { transformed = function* () { for (const val of chunk) { yield String.fromCharCode(val); } }(); } else { transformed = chunk[Symbol.iterator](); } callback(null, Array.from(transformed).map(s => s.toUpperCase()).join('')); } }); } } const spyingAlphaTransformStream = new class extends Transform { private oncePaused = false; constructor() { super({ transform: (chunk: string | Buffer, encoding: string, callback: Function) => { console.log('Before transform:'); if (Buffer.isBuffer(chunk)) { console.log(chunk.toString('utf-8')); alphaTransform.write(chunk); } else { console.log(chunk); alphaTransform.write(chunk, encoding); } callback(null, alphaTransform.read()); } }); this.on('pipe', (src: Readable) => { if (!this.oncePaused) { src.unpipe(this); // Here I unpipe the readable stream console.log(`Data event listeners count: ${src.listeners('data').length}`); console.log(`Readable state of reader: ${src.readable}`); console.log("We paused the reader!!"); setTimeout(() => { this.oncePaused = true; src.pipe(this); // Here I resume it...hopefully? src.resume(); console.log("We unpaused the reader!!"); console.log(`Data event listeners count: ${src.listeners('data').length}`); console.log(`Readable state of reader: ${src.readable}`); }, 1000); } }); this.on('data', (transformed) => { console.log('After transform:\n', transformed); }); } } const reader = new class extends Readable { constructor(private content?: string | Buffer) { super({ read: (size?: number) => { if (!this.content) { this.push(null); } else { this.push(this.content.slice(0, size)); this.content = this.content.slice(size); } } }); } } (new Buffer('The quick brown fox jumps over the lazy dog.\n')); reader.pipe(spyingAlphaTransformStream) .pipe(process.stdout); 

问题是与中间streamspyingAlphaTransformStream 。 这是一个监听pipe道事件,然后在1 second后暂停和恢复可读stream。 问题是,在将可读stream删除之后,再次对其进行pipe道操作,没有任何操作写入标准输出,这意味着spyingAlphaTransformStreamtransform方法从不会被调用,这意味着stream中的某些内容会被中断。

我期望输出看起来像这样:

 Data event listeners count: 0 Readable state of reader: true We paused the reader!! We unpaused the reader!! Data event listeners count: 1 Readable state of reader: true Before transform: The quick brown fox jumps over the lazy dog. After transform: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG. THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG. 

但它实际上看起来像:

 Data event listeners count: 0 Readable state of reader: true We paused the reader!! We unpaused the reader!! Data event listeners count: 1 Readable state of reader: true 

基本上没有什么是可读的pipe道是我可以从中得出的结论。

我怎样才能解决这个问题?

的package.json

 { "name": "hello-stream", "version": "1.0.0", "main": "main.ts", "scripts": { "start": "npm run build:live", "build:live": "nodemon" }, "keywords": [ "typescript", "nodejs", "ts-node", "cli", "node", "hello" ], "license": "WTFPL", "devDependencies": { "@types/node": "^7.0.21", "nodemon": "^1.11.0", "ts-node": "^3.0.4", "typescript": "^2.3.2" }, "dependencies": {} } 

nodemon.json

 { "ignore": ["node_modules"], "delay": "2000ms", "execMap": { "ts": "ts-node" }, "runOnChangeOnly": false, "verbose": true } 

tsconfig.json

 { "compilerOptions": { "target": "es2015", "module": "commonjs", "typeRoots": ["node_modules/@types"], "lib": ["es6", "dom"], "strict": true, "noUnusedLocals": true, "types": ["node"] } } 

解决scheme比我预想的要简单得多。 我所要做的是find一种方法来推迟在transform方法中完成的任何callback,并在调用初始callback之前等待stream已经“准备就绪”。

基本上,在spyingAlphaTransformStream构造函数,我有一个布尔值检查stream是否准备好,如果不是,我存储一个callback的类将执行我在transform方法中收到的第一个callback。 现在,由于第一个callback没有执行, stream不会接收进一步的调用,即只有一个悬而未决的callback担心; 所以它现在只是一个等待游戏,直到stream表明它已经准备好(这是用一个简单的setTimeout来完成的)。

当stream是“就绪”的时候,我把ready boolean设置为true,然后我调用待处理的callback函数(如果设置的话),并且在这一点上,stream遍历整个stream。

我有一个更长的例子来说明这是如何工作的:

 import {Transform, Readable} from 'stream'; const alphaTransform = new class extends Transform { constructor() { super({ objectMode: true, transform: (chunk: string | Buffer, encoding: string, callback: Function) => { let transformed: IterableIterator<string>; if (Buffer.isBuffer(chunk)) { transformed = function* () { for (const val of chunk) { yield String.fromCharCode(val); } }(); } else { transformed = chunk[Symbol.iterator](); } callback(null, Array.from(transformed).map(s => s.toUpperCase()).join('')); } }); } } class LoggingStream extends Transform { private pending: () => void; private isReady = false; constructor(message: string) { super({ objectMode: true, transform: (chunk: string | Buffer, encoding: string, callback: Function) => { if (!this.isReady) { // ready flag this.pending = () => { // create a pending callback console.log(message); if (Buffer.isBuffer(chunk)) { console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`); } else { console.log(`[${new Date().toTimeString()}]: ${chunk}`); } callback(null, chunk); } } else { console.log(message); if (Buffer.isBuffer(chunk)) { console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`); } else { console.log(`[${new Date().toTimeString()}]: ${chunk}`); } callback(null, chunk); } } }); this.on('pipe', this.pauseOnPipe); } private pauseOnPipe() { this.removeListener('pipe', this.pauseOnPipe); setTimeout(() => { this.isReady = true; // set ready flag to true if (this.pending) { // execute pending callbacks (if any) this.pending(); } }, 3000); // wait three seconds } } const reader = new class extends Readable { constructor(private content?: string | Buffer) { super({ read: (size?: number) => { if (!this.content) { this.push(null); } else { this.push(this.content.slice(0, size)); this.content = this.content.slice(size); } } }); } } (new Buffer('The quick brown fox jumps over the lazy dog.\n')); reader.pipe(new LoggingStream("Before transformation:")) .pipe(alphaTransform) .pipe(new LoggingStream("After transformation:")) .pipe(process.stdout); 

产量

 <Waits about 3 seconds...> Before transformation: [11:13:53 GMT-0600 (CST)]: The quick brown fox jumps over the lazy dog. After transformation: [11:13:53 GMT-0600 (CST)]: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG. THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG. 

注意,因为JS是单线程的,所以在继续之前,两个详细的stream都等待相同的时间量