rxjs,节点,处理程序中的订阅内存泄漏

假设你有一个像这样的express样式处理函数:

 (req, res, next) => { const requestedProgramIDs = req.body.programIDs; // assume all data is in a cache observable cache$.filter(data => requestedProgramIDs.indexOf(data.id) >= 0) .toArray() .subscribe(programs => res.end(JSON.stringify(programs))) }; 

我的问题是这个订阅会泄漏内存吗? 我担心,如果我只是unsubscribefunction的结束,那么不能保证订阅代码将执行。

我的解决scheme思路如下:

  • 使用.take(1)确保取消订阅
  • 如果observable的scheduler是同步的,那么取消订阅将是同步的,我可以在函数的末尾加上它。

问题:

  • 我是否认为订阅会泄漏?
  • 如果,那么你会如何build议我处理(防止)泄漏?

可观察合同订阅和取消订阅部分规定:

当Observable向观察者发出OnError或OnComplete通知时,这会结束订阅。 观察者不需要发出取消订阅通知来结束由Observable以这种方式结束的订阅。

因此,如果您的cache$ observable完成 – 它必须为toArray运算符发出一个数组 – 它不需要调用unsubscribe 。 只要组成的observable完成或错误,它不会泄漏。

另外,您可能还想连线,以便将错误转发到Express:

 .subscribe(programs => res.end(JSON.stringify(programs)), next)