如何用RxJS编写简单的元stream

背景

这是我第一次尝试反应式编程。 我有一个不时接收数据缓冲区的stream。

当一个缓冲区以02开始时,意味着一个消息的开始,当它以03结束时,意味着消息的结束。

例:

 - - - 02 53 44 5a - - - 52 6a 72 7a - - - 62 32 62 0d - 0a 03 - - - - > 

目的

我的目标是每次检测到一个消息完成时发出一个事件,以整个消息作为参数。

研究

阅读了几个关于响应式编程和阅读RxJS的 教程之后,我知道我想创build一个热门的可观察 。

通过我的研究,我相信我想要创造一个“元观察”或“元stream”,即可观察到的可观察到的stream或溪stream(可能是这样的混淆……)。

我的计划如下:

1 – 为所有进入2的缓冲区消息创build一个观测值 – 订阅步骤1,并创build一个观测值,检测消息的开始( 02 )和消息的结束( 03 )3 – 为步骤2订阅一些内容将照顾的数据。

所以,我的图如下所示:

 - 05 34 43 31 - - - 12 23 43 - - 02 53 44 5a - - - 52 6a 72 7a - - - 62 32 62 0d - 0a 03 - - - - > - - 02 53 44 5a - - - 52 6a 72 7a - - - 62 32 62 0d - 0a 03 - - | 

问题

1 – 因为我通过node.js中的套接字接收这些数据:

 socket = net.createConnection( { host: "localhost", port: 8080}, () => { socket.on( "data", console.log ); } ); 

我不认为我可以使用rxjs提供的EventEmmiter桥。 我将不得不使用Observer.create ,正确的?

2 – 即使我解决了问题1,我也不知道如何创build这种效果的stream。 可以有人发布代码片段作为例子吗?

你在正确的轨道上。 但是,你想创build一个observable,所以你可能想要Observable.create而不是Observer.create 。 这是非常混乱的,因为Observable.create需要一个接受Observer的函数。 那个函数可以把这个项目发送到那个Observer

你也热衷于认识到你想要一个热门的可观察的东西。 然而, Observable.create无论如何会给你一个热门的可观察的。 您仍然希望使用某种发布来避免多个订阅。 我们将用share来做到这一点。 至于缓冲,你可以使用buffer 。 在这一天结束时,我认为这将是像…

  • A – 使用Observable.create创build的observable包裹在你的套接字callback中。
  • Bshare A包装
  • C – flatMap B从缓冲区stream转到一系列的项目
  • D – 观看C,并使用filter / where只发射03
  • E buffer C使用D作为closuresselect器

然后将E作为您的服务的公共API。 E会在消息到达时发出整个消息。

*请注意,这种方法假定您接收到消息,因此我们不打扰02信号,因为我们知道消息结束后的第一个字节必须是下一个消息的开始。 如果不是这种情况,你会想要更好地处理。

**你可以在flatMap之后分享。 这将会稍微高效一些。 你不能在缓冲区之后共享。

***重新读你的问题,我注意到你想要一个stream的stream,而不是一个数组stream。 为了得到这个结果,你可以获取buffer的输出,并使用Observable.of通过flatMap运行它。这会给你带来冷stream的热stream。