RxJS具有asynchronous订户function的Observable

我试图做一些感觉应该是直截了当的事情,但certificate令人惊讶的困难。

我有一个订阅RabbitMQ队列的函数。 具体来说,这里是Channel.consume函数: http ://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

它返回一个承诺,这个承诺用一个订阅ID来解决 – 稍后需要取消订阅 – 并且还有一个callback参数来在邮件从队列中取出时调用。

当我想取消订阅队列时,我需要在这里使用Channel.cancel函数取消使用者: http ://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。 这需要先前返回的订阅ID。

我想把所有这些东西包装在观察者订阅时订阅队列的Observable中,并且当observable退订时取消订阅。 然而,由于调用的“双重asynchronous”性质(我的意思是说,它们同时具有callback和返回承诺),这样做有些困难。

理想情况下,我想写的代码是:

return new Rx.Observable(async (subscriber) => { var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message)); return async () => { await channel.cancel(consumeResult.consumerTag); }; }); 

然而,这是不可能的,因为这个构造函数不支持asynchronous订阅函数或者拆卸逻辑。

我一直无法找出这一个。 我在这里错过了什么? 为什么这么难?

干杯,Alex

创build的observable不需要等待channel.consume解决,因为观察者(这是一个观察者的传递,而不是订阅者)只能从你提供的函数中调用。

但是,您返回的取消订阅function将不得不等待该承诺才能解决。 它可以做到这一点,就像这样:

 return new Rx.Observable((observer) => { var consumeResult = channel.consume(queueName, (message) => observer.next(message)); return () => { consumeResult.then(() => channel.cancel(consumeResult.consumerTag)); }; });