合并Node.jsstream

我有一堆我读的文件,处理和合并来自相应的多个stream的某些数据到一个单一的stream。

有没有比下面更优雅的解决scheme(有一个单独的计数器,所有源stream发出end后调用combinedStream.end() ):

 let combinedStream = ....; let counter = 0; filePaths.forEach(function(filePath) { counter += 1; const fileStream = fs.createReadStream(filePath); const myStream = new MyStream(fileStream); myStream.on('data', myStream.write); myStream.on('end', function() { counter -= 1; if (counter === 0) { combinedStream.end(); } }); }); return combinedStream; 

一个更清洁的方法可以是那个回购中使用的方法,即使它只是把你的柜台藏在某个地方,让你处理一个更舒适的基于callback的模型。

这样,您的代码将如下所示:

 let sharedStream = ... function onEachFilename(filename, callback) { // here you can read from the stream and push the data on the shared one, // then invoke the "internal" callback on the end event } function onEndAll() { // here you can finalize and close the shared stream } forEach(filenames, onEachFilename, onEndAll); 

请记住,在所有的callback函数被调用后,某处仍有一个负责计数的函数,并调用onEnd函数。

你可以用一个Transformstream来处理文件,然后将其传递给一个PassThroughstream。

既然你用的是let ,我猜你可以用ES2015。

  "use strict"; let fs=require('fs'); let filePaths=['./tmp/h.txt','./tmp/s.txt']; let Stream = require('stream'); class StreamProcessor { constructor() { this.process_streams = []; } push (source_stream) { // Create a new Transform Stream let transform = new StreamTransformer(); // Register the finish event and pipe transform.processed = transform.wait.call(transform); source_stream.pipe(transform); // push the stream to the internal array this.process_streams.push(transform); } done (callback) { let streams = this.process_streams; // Wait for all Transform streams to finish processing Promise.all( streams.map(function(s) {return s.processed; }) ) .then ( function() { let combined_stream=new Stream.PassThrough(); streams.forEach(function (stream) { stream.pipe(combined_stream); }); // Call the callback with stream callback(null,combined_stream); }) .catch(function (err) { callback(err); }); } } class StreamTransformer extends Stream.Transform { constructor () { // call super super(); } _transform(chunk,enc, transformed) { // process files here let data=chunk.toString(); data=data.substring(0,data.length-2); this.push(data); transformed(); } _flush(flushed) { // for additonal at end this.push('\n'); flushed(); } wait() { // returns a promise that resolves, when all the data is processed; let stream = this; return new Promise(function(resolve,reject) { stream.on('finish', function() { resolve(true); }); stream.on('error', function(err) { reject(err); }); }); } } /// Now you can do.. let process_stream = new StreamProcessor(); filePaths.forEach(function (fpath) { let fstream = fs.createReadStream(fpath); process_stream.push(fstream); }); process_stream.done( function (err,combined_stream) { // Consume the combines stream combined_stream.pipe(process.stdout); }); 

testing文件包含“hello”和“stream”

  // Outputs is // hell // stream 

这可以进一步改善.. :/