将数据configuration为尚未准备好接收数据的可写入stream

有没有办法将可读stream连接到Node.js中的可写入stream,其中写入器尚未准备好接收数据? 换句话说,我想将可读性和可写性连接起来,但是我想在程序的稍后阶段初始化可写,包括定义写入方法。 也许我们必须实现写入方法,但有没有办法以类似的方式暂停可写入的stream,在这种方式中可以暂停可读的stream? 或者,也许我们可以使用中间通过/变换stream并在那里将数据缓冲在那里,然后再将数据传输到可写入数据中!

举例来说,我们通常会这样做:

readable.pipe(transform).pipe(writable); 

但我想要做一些事情:

 const tstrm = readable.pipe(transform); doSomethingAsync().then(function(){ tstrm.pipe(writable); }); 

只是想知道这是否可能,如何做到这一点,到目前为止无法解决这两个问题。

我想我正在寻找缓冲中间转换stream中的数据,然后连接/pipe理一个可写入的stream,然后,一旦连接,在任何新的数据之前先stream缓冲的数据。 似乎是一个合理的事情,不能find任何信息。

请注意,我在这里使用间隔模仿作者能够阅读或不能。 你可以做任何你想要的方式,即如果作家返回false你会更新状态开始缓冲等我认为最后一行是你想要的

 r.pipe(b).pipe(w); 

内容如下

 readStrem.pipe(transformBbuffer).pipe(writeStream); 

示例代码中,我们可以对缓冲所有数据进行一些更改。 我会在代码后面描述。 你需要知道的关于stream和更多的东西都在文档中,我认为他们可以做更多完整的例子,但是它们相当不错。

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

这个代码。

 var fs = require('fs'); var stream = require('stream') const util = require('util'); //const StringDecoder = require('string_decoder').StringDecoder; const Transform = require('stream').Transform; var check_buff = 0; var DRAIN_ME = 0; var r = fs.createReadStream('file1.txt').setEncoding('utf8'); var w = fs.createWriteStream('file2.txt'); var BufferStream = function () { stream.Transform.apply(this, arguments); this.buffer = []; }; util.inherits(BufferStream, stream.Transform); var intId; intId = setInterval(function(){ if(check_buff % 3 == 0) { DRAIN_ME = 1; return; } DRAIN_ME = 0; },10); BufferStream.prototype._transform = function (chunk, encoding, done) { this.buffer.push(String(chunk)); while(DRAIN_ME > 0 && this.buffer.length > 0) { this.push(this.buffer.shift()); } console.log(chunk.length); console.log(this.buffer.length); done(); }; var b = new BufferStream(); b.on('end', function(chunk) { clearInterval(intId); }); r.pipe(b).pipe(w); 

我正在寻找规范的方式来实现一个变换/通过stream,缓冲所有的数据,直到pipe道调用它。

进行以下更改

 BufferStream.prototype._transform = function (chunk, encoding, done) { this.buffer.push(String(chunk)); console.log(chunk.length); console.log(this.buffer.length); done(); }; ...... BufferStream.prototype._flush = function (cb) { var len = this.buffer.length; for (var i = 0; i < len; i++) { this.push(this.buffer.shift()); }; cb(); }; 

您也可以暂停可读stream,它将暂停可写stream,因为它停止接收数据,即…

要testing这个在磁盘上创build一个相当大的文件,即100MB或更多,并运行此…

 var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { var ready = 0; readableStream.pause(); setInterval(function(){ if(ready == 0) { //console.log('pausing'); readableStream.pause(); ready = 1; } else { //console.log('resuming'); readableStream.resume(); ready = 0; } },100); writableStream.write(chunk); }); 

立即暂停的原因是因为在间隔已经被激发10ms的时候文件可能已经被写入了。 有这种变化,即…

 var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); var ready = 0; setInterval(function(){ if(ready == 0) { //console.log('pausing'); readableStream.pause(); ready = 1; } else { //console.log('resuming'); readableStream.resume(); ready = 0; } },100); readableStream.on('data', function(chunk) { writableStream.write(chunk); readableStream.pause(); });