集成testinggeteventstore使用rxjs有竞争条件

对不起,这个有点乱。 我的项目在nodejs中。 我在摩卡testing。 在这里我打开一个连接到geteventstore并订阅一个stream。 这基本上开始发射事件。

我把这个事件订阅包装在一个rxjs observable中,然后把它写到控制台。

有一半的时间我有一半的时间,我没有得到一个事件stream。

我感觉到eventloop开始聆听,没有听到任何东西,并在geteventstore可以开始使用事件爆炸之前closures。

我有点不知所措 我可以告诉geteventstore发送数据cuz一半的时间,我得到它。 我的理解是,只要有人订阅了一个事件,例如有一个事件侦听器,循环就会保持打开状态。

所以也许问题是与rxjs?

我不知道,任何帮助将不胜感激。

– – 编辑

我不知道这是否会有所帮助,但testing看起来像这样。

context('when calling subscription', ()=> { it('should stay open', function () { mut = bootstrap.getInstanceOf('gesConnection'); var rx = bootstrap.getInstanceOf('rx'); var subscription = mut.subscribeToAllFrom(); rx.Observable.fromEvent(subscription, 'event').forEach(x=> console.log(x)); subscription.on('event', function (payload) { console.log('event received by dispatcher'); console.log('event processed by dispatcher'); }); mut._handler._connectingPhase.must.equal('Connected'); }) }); 

所以mut是与geteventstore的连接,rx是rxjs,而订阅对象是一个事件驱动程序,它将数据从geteventstore中抽出。

我明白,这个问题是由于它处理至less两个有点不寻常的产品,geteventstore和rxjs。

我的意思是,我非常相信gesConnection和订阅实际上是连接和发布的,我只是不知道如何进一步testing/调查。

谢谢

我没有看到你利用摩卡的asynchronoustesting工具 。

MochaJs不知道它应该等待你的testing时间超过你的函数返回的时间。

通常你会返回一个承诺:

  it('must stay open', () => { mut = bootstrap.getInstanceOf('gesConnection'); var rx = bootstrap.getInstanceOf('rx'); var subscription = mut.subscribeToAllFrom(); subscription.on('event', function (payload) { console.log('event received by dispatcher'); console.log('event processed by dispatcher'); }); var promise = rx.Observable .fromEvent(subscription, 'event') .take(100) // stop test after 100 events .do(x => console.log(x)) .finally(() => { // do any cleanup here. // such as close your connection // or "subscription" variable }) .toPromise(); mut._handler._connectingPhase.must.equal('Connected'); // tells Mocha to wait until the observable completes return promise; });