转换stream以将string前置到每一行
我像这样产生了一个subprocess:
const n = cp.spawn('bash'); n.stdout.pipe(process.stdout); n.stderr.pipe(process.stderr);
我正在寻找一个转换stream,以便我可以在孩子的每一行的开始处添加像[[child process]]之类的东西,所以我知道stdio是从父subprocess发出的。
所以它看起来像:
const getTransformPrepender = function() : Transform { return ... } n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout); n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);
有没有人知道是否有这样的现有变换包或如何写一个?
我有这个:
import * as stream from 'stream'; export default function(pre: string){ let saved = ''; return new stream.Transform({ transform(chunk, encoding, cb) { cb(null, String(pre) + String(chunk)); }, flush(cb) { this.push(saved); cb(); } }); }
但是恐怕它不能在边缘情况下工作 – 在这种情况下,一个块突发可能不包括整行(对于很长的行)。
它看起来像这样的答案在这里: https : //strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/
但有了这个附录: https : //twitter.com/the1mills/status/886340747275812865
总共有三种情况需要正确处理:
- 代表整条线的单个块
- 代表多行的单个块
- 一个块只代表部分行
下面是解决所有这三种情况的algorithm描述
- 接收大量的数据
- 扫描块的换行符
- 一旦find换行符,就把它放在它之前(包括换行符),然后把它作为单行input发送出去
- 重复,直到整个块已被处理(没有剩余的数据),或直到没有发现额外的换行符(一些数据保留,以备后用)
这里有一个实际的实现和描述为什么需要等
请注意,出于性能原因,我不会将缓冲区转换为经典的JSstring。
const { Transform } = require('stream') const prefix = Buffer.from('[worker]: ') const prepender = new Transform({ transform(chunk, encoding, done) { this._rest = this._rest && this._rest.length ? Buffer.concat([this._rest, chunk]) : chunk let index // As long as we keep finding newlines, keep making slices of the buffer and push them to the // readable side of the transform stream while ((index = this._rest.indexOf('\n')) !== -1) { // The `end` parameter is non-inclusive, so increase it to include the newline we found const line = this._rest.slice(0, ++index) // `start` is inclusive, but we are already one char ahead of the newline -> all good this._rest = this._rest.slice(index) // We have a single line here! Prepend the string we want this.push(Buffer.concat([prefix, line])) } return void done() }, // Called before the end of the input so we can handle any remaining // data that we have saved flush(done) { // If we have any remaining data in the cache, send it out if (this._rest && this._rest.length) { return void done(null, Buffer.concat([prefix, this._rest]) } }, }) process.stdin.pipe(prepender).pipe(process.stdout)
这是一个正在进行的工作:
https://github.com/ORESoftware/prepend-transform
但它的目的是解决手头的问题,如下所示:
import pt from 'prepend-transform'; import * as cp from 'child_process'; const n = cp.spawn('bash'); n.stdout.pipe(pt('[child stdout]')).pipe(process.stdout); n.stderr.pipe(pt('[child stderr]')).pipe(process.stderr);