RxJS:将Node.js套接字转换为Observable并将它们合并成一个stream

我试图将Node的套接字转换为使用RxJS的stream。 目标是让每个套接字创build它自己的stream,并将所有stream合并成一个。 当新的套接字连接时,将使用socketStream = Rx.Observable.fromEvent(socket, 'message')创build一个stream。

然后这个stream被合并到一个类似的主stream中

mainStream = mainStream.merge(socketStream)

这似乎工作正常,问题是,200-250客户端连接后,服务器引发RangeError: Maximum call stack size exceeded

我有示例服务器和客户端代码,在这里主要演示此行为: 示例服务器和客户端

我怀疑,随着客户端连接/断开连接,主stream不能正确清理。

问题是你正在recursion地合并你的Observable 。 每次你做

 cmdStream = cmdStream.merge(socketStream); 

您正在创build一个新的MergeObservable/MergeObserver对。

看一看源代码 ,你可以看到,你基本上每个订阅的内容是按顺序订阅你以前的每一个stream,所以不难看出,在大约250个连接上,你的调用堆栈可能至less是1000通话深度。

解决这个问题的一个更好的方法是转换使用flatMap操作符并将您的连接视为创buildObservable of Observables

 //Turn the connections themselves into an Observable var connections = Rx.Observable.fromEvent(server, 'connection', socket => new JsonSocket(socket)); connections //flatten the messages into their own Observable .flatMap(socket => { return Rx.Observable.fromEvent(socket.__socket, 'message') //Handle the socket closing as well .takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close')); }, (socket, msg) => { //Transform each message to include the socket as well. return { socket : socket.__socket, data : msg}; }) .subscribe(processData, handleError); 

上面我没有testing,但应该修复你的SO错误。

我可能也会质疑这个的总体devise。 你把所有的Observable合并Observable一起到底是什么? 你仍然通过传递socket对象和消息来区分它们,所以看起来这些可能都是不同的stream。