如何编写一个Node.js模块来处理传入的pipe道stream

我试图编写一个节点模块,接受一个传入的pipe道二进制(或base64编码)stream,但坦率地说,我甚至不知道从哪里开始。 我看不到有关处理传入stream的Node文档中的任何示例; 我只看到消耗他们的例子?

举例来说,我想能够做到这一点:

var asset = new ProjectAsset('myFile', __dirname + '/image.jpg') var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }).pipe(asset) stream.on('finish', function() { done() }) 

我已经看到了这样的ProjectAsset ,但我在下一步去哪里的损失:

 'use strict' var stream = require('stream'), util = require('util') var ProjectAsset = function() { var self = this Object.defineProperty(self, 'binaryData', { configurable: true, writable: true }) stream.Stream.call(self) self.on('pipe', function(src) { // does it happen here? how do I set self.binaryData? }) return self } util.inherits(ProjectAsset, stream.Stream) module.exports = ProjectAsset module.exports.DEFAULT_FILE_NAME = 'file' 

可以从stream.Streaminheritance并使其工作,但是基于文档中可用的内容,我build议从stream.Writableinheritance。 pipe道到stream.Writable你需要有_write(chunk, encoding, done)来处理pipe道。 这里是一个例子:

 var asset = new ProjectAsset('myFile', __dirname + '/image.jpg') var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }).pipe(asset) stream.on('finish', function() { console.log(asset.binaryData); }) 

项目资产

 'use strict' var stream = require('stream'), util = require('util') var ProjectAsset = function() { var self = this self.data self.binaryData = []; stream.Writable.call(self) self._write = function(chunk, encoding, done) { // Can handle this data however you want self.binaryData.push(chunk.toString()) // Call after processing data done() } self.on('finish', function() { self.data = Buffer.concat(self.binaryData) }) return self } util.inherits(ProjectAsset, stream.Writable) module.exports = ProjectAsset module.exports.DEFAULT_FILE_NAME = 'file' 

如果您正在从stream读取数据,请参阅从stream.Duplexinheritance并且还包含_read(size)方法。

如果你正在做一些简单的事情,还有简化的构造函数api 。

我不知道这是否是exaclty你在找什么,但我想你可以处理它使用缓冲区的API与Buffer.concat缓冲区的数组可以检索streamdata侦听器

 'use strict' var stream = require('stream'), util = require('util'); var ProjectAsset = function() { var self = this Object.defineProperty(self, 'binaryData', { configurable: true, writable: true }) stream.Stream.call(self) var data; var dataBuffer=[]; self.on('data', function(chunk) { dataBuffer.push(chunk); }).on('end',function(){ data=Buffer.concat(dataBuffer); }); self.binaryData=data.toString('binary'); return self } util.inherits(ProjectAsset, stream.Stream) module.exports = ProjectAsset module.exports.DEFAULT_FILE_NAME = 'file' 

因为你使用var asset = new ProjectAsset('myFile', __dirname + '/image.jpg')我想你的ProjectAsset的责任是采取一些inputstream做一些转换,并将其写入文件。 你可以实现一个转换stream,因为你从stream接收到一些input,并生成一些输出,可以保存到一个文件或其他一些写入stream。

你当然可以通过从node.js 转换stream实现一个转换stream,但是inheritance是非常麻烦的,所以我的实现使用了through2来实现转换stream:

 module.exports = through2(function (chunk, enc, callback) { // This function is called whenever a piece of data from the incoming stream is read // Transform the chunk or buffer the chunk in case you need more data to transform // Emit a data package to the next stream in the pipe or omit this call if you need more data from the input stream to be read this.push(chunk); // Signal through2 that you processed the incoming data package callback(); })) 

用法

 var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }) .pipe(projectAsset) .pipe(fs.createWriteStream(__dirname + '/image.jpg')); 

正如你在这个例子中看到的,实现一个streampipe道完全分离了数据转换和数据保存。

工厂function

如果您喜欢在项目资产模块中使用类似构造函数的方法,因为您需要传递一些值或东西,您可以轻松地导出构造函数,如下所示

 var through2 = require('through2'); module.exports = function(someData) { // New stream is returned that can use someData argument for doing things return through2(function (chunk, enc, callback) { // This function is called whenever a piece of data from the incoming stream is read // Transform the chunk or buffer the chunk in case you need more data to transform // Emit a data package to the next stream in the pipe or omit this call if you need more data from the input stream to be read this.push(chunk); // Signal through2 that you processed the incoming data package callback(); }); } 

用法

 var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }) .pipe(projectAsset({ foo: 'bar' })) .pipe(fs.createWriteStream(__dirname + '/image.jpg'));