Nodejsstream暂停(非pipe理)和恢复(pipe道)中间pipe道
我需要“暂停”一个可读的stream达几秒钟,然后重新开始。 可读stream正在被传送到一个转换stream,所以我不能使用常规的pause
和resume
方法,我不得不使用unpipe
和pipe
。 在转换stream中,我能够检测到pipe
事件,然后在可读stream上进行unpipe
,然后在几秒钟后,再次pipe
恢复它(我希望)。
这里是代码:
main.ts
import {Transform, Readable} from 'stream'; const alphaTransform = new class extends Transform { constructor() { super({ objectMode: true, transform: (chunk: string | Buffer, encoding: string, callback: Function) => { let transformed: IterableIterator<string>; if (Buffer.isBuffer(chunk)) { transformed = function* () { for (const val of chunk) { yield String.fromCharCode(val); } }(); } else { transformed = chunk[Symbol.iterator](); } callback(null, Array.from(transformed).map(s => s.toUpperCase()).join('')); } }); } } const spyingAlphaTransformStream = new class extends Transform { private oncePaused = false; constructor() { super({ transform: (chunk: string | Buffer, encoding: string, callback: Function) => { console.log('Before transform:'); if (Buffer.isBuffer(chunk)) { console.log(chunk.toString('utf-8')); alphaTransform.write(chunk); } else { console.log(chunk); alphaTransform.write(chunk, encoding); } callback(null, alphaTransform.read()); } }); this.on('pipe', (src: Readable) => { if (!this.oncePaused) { src.unpipe(this); // Here I unpipe the readable stream console.log(`Data event listeners count: ${src.listeners('data').length}`); console.log(`Readable state of reader: ${src.readable}`); console.log("We paused the reader!!"); setTimeout(() => { this.oncePaused = true; src.pipe(this); // Here I resume it...hopefully? src.resume(); console.log("We unpaused the reader!!"); console.log(`Data event listeners count: ${src.listeners('data').length}`); console.log(`Readable state of reader: ${src.readable}`); }, 1000); } }); this.on('data', (transformed) => { console.log('After transform:\n', transformed); }); } } const reader = new class extends Readable { constructor(private content?: string | Buffer) { super({ read: (size?: number) => { if (!this.content) { this.push(null); } else { this.push(this.content.slice(0, size)); this.content = this.content.slice(size); } } }); } } (new Buffer('The quick brown fox jumps over the lazy dog.\n')); reader.pipe(spyingAlphaTransformStream) .pipe(process.stdout);
问题是与中间streamspyingAlphaTransformStream
。 这是一个监听pipe道事件,然后在1 second
后暂停和恢复可读stream。 问题是,在将可读stream删除之后,再次对其进行pipe道操作,没有任何操作写入标准输出,这意味着spyingAlphaTransformStream
的transform
方法从不会被调用,这意味着stream中的某些内容会被中断。
我期望输出看起来像这样:
Data event listeners count: 0 Readable state of reader: true We paused the reader!! We unpaused the reader!! Data event listeners count: 1 Readable state of reader: true Before transform: The quick brown fox jumps over the lazy dog. After transform: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG. THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
但它实际上看起来像:
Data event listeners count: 0 Readable state of reader: true We paused the reader!! We unpaused the reader!! Data event listeners count: 1 Readable state of reader: true
基本上没有什么是从可读的pipe道是我可以从中得出的结论。
我怎样才能解决这个问题?
的package.json
{ "name": "hello-stream", "version": "1.0.0", "main": "main.ts", "scripts": { "start": "npm run build:live", "build:live": "nodemon" }, "keywords": [ "typescript", "nodejs", "ts-node", "cli", "node", "hello" ], "license": "WTFPL", "devDependencies": { "@types/node": "^7.0.21", "nodemon": "^1.11.0", "ts-node": "^3.0.4", "typescript": "^2.3.2" }, "dependencies": {} }
nodemon.json
{ "ignore": ["node_modules"], "delay": "2000ms", "execMap": { "ts": "ts-node" }, "runOnChangeOnly": false, "verbose": true }
tsconfig.json
{ "compilerOptions": { "target": "es2015", "module": "commonjs", "typeRoots": ["node_modules/@types"], "lib": ["es6", "dom"], "strict": true, "noUnusedLocals": true, "types": ["node"] } }
解决scheme比我预想的要简单得多。 我所要做的是find一种方法来推迟在transform
方法中完成的任何callback,并在调用初始callback之前等待stream已经“准备就绪”。
基本上,在spyingAlphaTransformStream
构造函数,我有一个布尔值检查stream是否准备好,如果不是,我存储一个callback的类将执行我在transform
方法中收到的第一个callback。 现在,由于第一个callback没有执行, stream不会接收进一步的调用,即只有一个悬而未决的callback担心; 所以它现在只是一个等待游戏,直到stream表明它已经准备好(这是用一个简单的setTimeout
来完成的)。
当stream是“就绪”的时候,我把ready boolean设置为true,然后我调用待处理的callback函数(如果设置的话),并且在这一点上,stream遍历整个stream。
我有一个更长的例子来说明这是如何工作的:
import {Transform, Readable} from 'stream'; const alphaTransform = new class extends Transform { constructor() { super({ objectMode: true, transform: (chunk: string | Buffer, encoding: string, callback: Function) => { let transformed: IterableIterator<string>; if (Buffer.isBuffer(chunk)) { transformed = function* () { for (const val of chunk) { yield String.fromCharCode(val); } }(); } else { transformed = chunk[Symbol.iterator](); } callback(null, Array.from(transformed).map(s => s.toUpperCase()).join('')); } }); } } class LoggingStream extends Transform { private pending: () => void; private isReady = false; constructor(message: string) { super({ objectMode: true, transform: (chunk: string | Buffer, encoding: string, callback: Function) => { if (!this.isReady) { // ready flag this.pending = () => { // create a pending callback console.log(message); if (Buffer.isBuffer(chunk)) { console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`); } else { console.log(`[${new Date().toTimeString()}]: ${chunk}`); } callback(null, chunk); } } else { console.log(message); if (Buffer.isBuffer(chunk)) { console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`); } else { console.log(`[${new Date().toTimeString()}]: ${chunk}`); } callback(null, chunk); } } }); this.on('pipe', this.pauseOnPipe); } private pauseOnPipe() { this.removeListener('pipe', this.pauseOnPipe); setTimeout(() => { this.isReady = true; // set ready flag to true if (this.pending) { // execute pending callbacks (if any) this.pending(); } }, 3000); // wait three seconds } } const reader = new class extends Readable { constructor(private content?: string | Buffer) { super({ read: (size?: number) => { if (!this.content) { this.push(null); } else { this.push(this.content.slice(0, size)); this.content = this.content.slice(size); } } }); } } (new Buffer('The quick brown fox jumps over the lazy dog.\n')); reader.pipe(new LoggingStream("Before transformation:")) .pipe(alphaTransform) .pipe(new LoggingStream("After transformation:")) .pipe(process.stdout);
产量
<Waits about 3 seconds...> Before transformation: [11:13:53 GMT-0600 (CST)]: The quick brown fox jumps over the lazy dog. After transformation: [11:13:53 GMT-0600 (CST)]: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG. THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
注意,因为JS是单线程的,所以在继续之前,两个详细的stream都等待相同的时间量