转换stream以将string前置到每一行

我像这样产生了一个subprocess:

const n = cp.spawn('bash'); n.stdout.pipe(process.stdout); n.stderr.pipe(process.stderr); 

我正在寻找一个转换stream,以便我可以在孩子的每一行的开始处添加像[[child process]]之类的东西,所以我知道stdio是从父subprocess发出的。

所以它看起来像:

 const getTransformPrepender = function() : Transform { return ... } n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout); n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr); 

有没有人知道是否有这样的现有变换包或如何写一个?

我有这个:

 import * as stream from 'stream'; export default function(pre: string){ let saved = ''; return new stream.Transform({ transform(chunk, encoding, cb) { cb(null, String(pre) + String(chunk)); }, flush(cb) { this.push(saved); cb(); } }); } 

但是恐怕它不能在边缘情况下工作 – 在这种情况下,一个块突发可能不包括整行(对于很长的行)。

它看起来像这样的答案在这里: https : //strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/

但有了这个附录: https : //twitter.com/the1mills/status/886340747275812865

总共有三种情况需要正确处理:

  • 代表整条线的单个块
  • 代表多行的单个块
  • 一个块只代表部分行

下面是解决所有这三种情况的algorithm描述

  1. 接收大量的数据
  2. 扫描块的换行符
  3. 一旦find换行符,就把它放在它之前(包括换行符),然后把它作为单行input发送出去
  4. 重复,直到整个块已被处理(没有剩余的数据),或直到没有发现额外的换行符(一些数据保留,以备后用)

这里有一个实际的实现和描述为什么需要等

请注意,出于性能原因,我不会将缓冲区转换为经典的JSstring。

 const { Transform } = require('stream') const prefix = Buffer.from('[worker]: ') const prepender = new Transform({ transform(chunk, encoding, done) { this._rest = this._rest && this._rest.length ? Buffer.concat([this._rest, chunk]) : chunk let index // As long as we keep finding newlines, keep making slices of the buffer and push them to the // readable side of the transform stream while ((index = this._rest.indexOf('\n')) !== -1) { // The `end` parameter is non-inclusive, so increase it to include the newline we found const line = this._rest.slice(0, ++index) // `start` is inclusive, but we are already one char ahead of the newline -> all good this._rest = this._rest.slice(index) // We have a single line here! Prepend the string we want this.push(Buffer.concat([prefix, line])) } return void done() }, // Called before the end of the input so we can handle any remaining // data that we have saved flush(done) { // If we have any remaining data in the cache, send it out if (this._rest && this._rest.length) { return void done(null, Buffer.concat([prefix, this._rest]) } }, }) process.stdin.pipe(prepender).pipe(process.stdout) 

这是一个正在进行的工作:

https://github.com/ORESoftware/prepend-transform

但它的目的是解决手头的问题,如下所示:

 import pt from 'prepend-transform'; import * as cp from 'child_process'; const n = cp.spawn('bash'); n.stdout.pipe(pt('[child stdout]')).pipe(process.stdout); n.stderr.pipe(pt('[child stderr]')).pipe(process.stderr);