在Node.js中“重新分块”Stream对象

很简单的node.js问题。 我想扩展stream对象来重新组合来自远程连接的数据。 我正在做多个telnet和发送命令到其他服务器,他们发回回应。 看起来像这样

> Hello, this is a command This is the response to the command. Sometimes it pauses here (which triggers the 'data' event prematurely). But the message isn't over until you see the semicolon ; 

我想要做的是不要在暂停时触发“数据”事件,而是等待; 并触发一个自定义的“消息”事件。

我已经阅读并重读了这个问题 ,但是我还没有完全理解它(部分原因是因为它是关于可写入的stream,部分是因为我还没有注意到CoffeeScript)。

编辑:我想我在这里问两件事情:

  1. 如何扩展/inheritancenet.CreateConnection使用的stream对象?
  2. 我可以只扩展prototype.write来做一个“分割”并重新“发送”每个部分吗?

以下是我到目前为止所做的一切,但分块应该是stream的一部分,而不是“数据”监听器的一部分:

 var net = require('net'); var nodes = [ //list of ip addresses ]; function connectToServer(ip) { var conn = net.createConnection(3083, ip); conn.on('connect', function() { conn.write ("login command;"); }); conn.on('data', function(data) { var read = data.toString(); var message_list = read.split(/^;/m); message_list.forEach (function(message) { console.log("Atonomous message from " + ip + ':' + message); //I need to extend the stream object to emit these instead of handling it here //Also, sometimes the data chunking breaks the messages in two, //but it should really wait for a line beginning with a ; before it emits. }); }); conn.on('end', function() { console.log("Lost conncection to " + ip + "!!"); }); conn.on('error', function(err) { console.log("Connection error: " + err + " for ip " + ip); }); } nodes.forEach(function(node) { connectToServer(node); }); 

如果我正在使用一个原始stream,我想这将是这样的(根据我在别处find的代码)?

 var messageChunk = function () { this.readable = true; this.writable = true; }; require("util").inherits(messageChunk, require("stream")); messageChunk.prototype._transform = function (data) { var regex = /^;/m; var cold_storage = ''; if (regex.test(data)) { var message_list = read.split(/^;/m); message_list.forEach (function(message) { this.emit("data", message); }); } else { //somehow store the data until data with a /^;/ comes in. } } messageChunk.prototype.write = function () { this._transform.apply(this, arguments); }; 

但是我不使用原始stream,我使用net.createConnection对象中的stream对象返回。

不要使用直接实现的_transform,_read,_write或_flush函数,这些函数用于节点的内部使用。

当您看到字符“;”时发出自定义事件 在你的stream中:

 var msg = ""; conn.on("data",function(data) { var chunk = data.toString(); msg += chunk; if(chunk.search(";") != -1) { conn.emit("customEvent",msg); msg = ""; } }); conn.on("customEvent",function(msg) { //do something with your message });