如何用RxJS编写简单的元stream
背景
这是我第一次尝试反应式编程。 我有一个不时接收数据缓冲区的stream。
当一个缓冲区以02
开始时,意味着一个消息的开始,当它以03
结束时,意味着消息的结束。
例:
- - - 02 53 44 5a - - - 52 6a 72 7a - - - 62 32 62 0d - 0a 03 - - - - >
目的
我的目标是每次检测到一个消息完成时发出一个事件,以整个消息作为参数。
研究
阅读了几个关于响应式编程和阅读RxJS的 教程之后,我知道我想创build一个热门的可观察 。
通过我的研究,我相信我想要创造一个“元观察”或“元stream”,即可观察到的可观察到的stream或溪stream(可能是这样的混淆……)。
我的计划如下:
1 – 为所有进入2的缓冲区消息创build一个观测值 – 订阅步骤1,并创build一个观测值,检测消息的开始( 02
)和消息的结束( 03
)3 – 为步骤2订阅一些内容将照顾的数据。
所以,我的图如下所示:
- 05 34 43 31 - - - 12 23 43 - - 02 53 44 5a - - - 52 6a 72 7a - - - 62 32 62 0d - 0a 03 - - - - > - - 02 53 44 5a - - - 52 6a 72 7a - - - 62 32 62 0d - 0a 03 - - |
问题
1 – 因为我通过node.js中的套接字接收这些数据:
socket = net.createConnection( { host: "localhost", port: 8080}, () => { socket.on( "data", console.log ); } );
我不认为我可以使用rxjs提供的EventEmmiter桥。 我将不得不使用Observer.create
,正确的?
2 – 即使我解决了问题1,我也不知道如何创build这种效果的stream。 可以有人发布代码片段作为例子吗?
你在正确的轨道上。 但是,你想创build一个observable,所以你可能想要Observable.create
而不是Observer.create
。 这是非常混乱的,因为Observable.create
需要一个接受Observer
的函数。 那个函数可以把这个项目发送到那个Observer
。
你也热衷于认识到你想要一个热门的可观察的东西。 然而, Observable.create
无论如何会给你一个热门的可观察的。 您仍然希望使用某种发布来避免多个订阅。 我们将用share
来做到这一点。 至于缓冲,你可以使用buffer
。 在这一天结束时,我认为这将是像…
- A – 使用
Observable.create
创build的observable包裹在你的套接字callback中。 - B –
share
A包装 - C – flatMap B从缓冲区stream转到一系列的项目
- D – 观看C,并使用
filter
/where
只发射03
。 - E
buffer
C使用D作为closuresselect器
然后将E作为您的服务的公共API。 E会在消息到达时发出整个消息。
*请注意,这种方法假定您接收到消息,因此我们不打扰02
信号,因为我们知道消息结束后的第一个字节必须是下一个消息的开始。 如果不是这种情况,你会想要更好地处理。
**你可以在flatMap
之后分享。 这将会稍微高效一些。 你不能在缓冲区之后共享。
***重新读你的问题,我注意到你想要一个stream的stream,而不是一个数组stream。 为了得到这个结果,你可以获取buffer
的输出,并使用Observable.of
通过flatMap
运行它。这会给你带来冷stream的热stream。