RxJS:将Node.js套接字转换为Observable并将它们合并成一个stream
我试图将Node的套接字转换为使用RxJS的stream。 目标是让每个套接字创build它自己的stream,并将所有stream合并成一个。 当新的套接字连接时,将使用socketStream = Rx.Observable.fromEvent(socket, 'message')
创build一个stream。
然后这个stream被合并到一个类似的主stream中
mainStream = mainStream.merge(socketStream)
这似乎工作正常,问题是,200-250客户端连接后,服务器引发RangeError: Maximum call stack size exceeded
。
我有示例服务器和客户端代码,在这里主要演示此行为: 示例服务器和客户端
我怀疑,随着客户端连接/断开连接,主stream不能正确清理。
问题是你正在recursion地合并你的Observable
。 每次你做
cmdStream = cmdStream.merge(socketStream);
您正在创build一个新的MergeObservable/MergeObserver
对。
看一看源代码 ,你可以看到,你基本上每个订阅的内容是按顺序订阅你以前的每一个stream,所以不难看出,在大约250个连接上,你的调用堆栈可能至less是1000通话深度。
解决这个问题的一个更好的方法是转换使用flatMap
操作符并将您的连接视为创buildObservable
of Observables
。
//Turn the connections themselves into an Observable var connections = Rx.Observable.fromEvent(server, 'connection', socket => new JsonSocket(socket)); connections //flatten the messages into their own Observable .flatMap(socket => { return Rx.Observable.fromEvent(socket.__socket, 'message') //Handle the socket closing as well .takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close')); }, (socket, msg) => { //Transform each message to include the socket as well. return { socket : socket.__socket, data : msg}; }) .subscribe(processData, handleError);
上面我没有testing,但应该修复你的SO错误。
我可能也会质疑这个的总体devise。 你把所有的Observable
合并Observable
一起到底是什么? 你仍然通过传递socket对象和消息来区分它们,所以看起来这些可能都是不同的stream。