Node.js中的面向行的stream

我正在开发一个使用Node.js的多进程应用程序。 在这个应用程序中,父进程会产生一个subprocess,并通过pipe道使用基于JSON的消息传递协议与其进行通信。 我发现大的JSON消息可能会被“截断”,使得发送到pipe道上的数据侦听器的单个“块”不包含完整的JSON消息。 此外,小JSON消息可能被分组在同一块中。 每个JSON消息将由一个换行符分隔,所以我想知道是否已经有一个实用程序将缓冲pipe道读取stream,使它一次发出一行(因此,对于我的应用程序,一个JSON文档一次)。 这似乎是一个很常见的用例,所以我想知道是否已经完成。

我会很感激任何人可以提供的指导。 谢谢。

也许佩德罗的运营商可以帮助你?

运营商可以帮助您在node.js上实现新的终止协议。

客户可以给你发送大量的线路,运营商只会在每一条完成的线路上通知你。

我对这个问题的解决scheme是发送JSON消息,每个终止一些特殊的Unicode字符。 一个你永远不会在JSONstring中获得的字符。 称它为TERM。

所以发件人只是“JSON.stringify(message)+ TERM;” 并写它。 接收者然后将TERM中的包含的数据拆分,并用JSON.parse()来parsing这些部分,这很快。 诀窍是最后一条消息可能无法parsing,所以我们只需保存该消息片段,并将其添加到下一条消息的开头。 接收代码如下所示:

s.on("data", function (data) { var info = data.toString().split(TERM); info[0] = fragment + info[0]; fragment = ''; for ( var index = 0; index < info.length; index++) { if (info[index]) { try { var message = JSON.parse(info[index]); self.emit('message', message); } catch (error) { fragment = info[index]; continue; } } } }); 

其中“片段”被定义为在数据块之间持续存在的地方。

但是什么是TERM? 我使用了Unicodereplace字符'\ uFFFD'。 也可以使用Twitter所使用的技术,其中消息由'\ r \ n'分隔,而推文使用'\ n'作为新行,并且不包含'\ r \ n'

我觉得这比用长度和类似的东西搞乱要简单得多。

最简单的解决scheme是将每条消息前面的json数据长度作为固定长度的前缀(4字节?)发送,并有一个简单的非成帧parsing器,用于caching小块或分裂大块。

您可以尝试使用节点二进制文件来避免手动编写parsing器。 看看scan(key, buffer)文档示例 – 它完全一行一行地阅读。

只要换行符(或者你使用的任何分隔符)只能分隔JSON消息而不是embedded它们,你可以使用下面的模式:

 const buf = '' s.on('data', data => { buf += data.toString() const idx = buf.indexOf('\n') if (idx < 0) { return } // No '\n', no full message let lines = buf.split('\n') buf = lines.pop() // if ends in '\n' then buf will be empty for (let line of lines) { // Handle the line } })