试图让我自己的RxJs可观察

我试图转换现有的API与RxJS一起工作…对于节点来说相当新,对于RxJs来说也很新,所以请耐心等待。

我有一个现有的API(getNextMessage),它可以阻塞(asynchronous),或者当某些东西变得可用时,通过节点样式(err,val)callback返回一个新的项目或错误。

所以它看起来像这样:

getNextMessage(nodeStyleCompletionCallback);

你可以把getNextMessage想象成一个http请求,在将来当服务器响应时,这个请求就会完成,但是一旦收到消息,你需要重新调用getNextMessage,以便不断从服务器获取新的项目。

所以,为了使它成为一个可观察的集合,我必须让RxJs继续调用我的getNextMessage函数,直到用户被处置();

基本上,我正在尝试创build自己的RxJs可观察集合。

问题是:

  1. 我不知道如何使subscriber.dispose()杀死async.forever
  2. 我可能不应该首先使用async.forever
  3. 我不确定我应该甚至为每条消息“完成” – 不应该在一个序列的末尾
  4. 我想最终删除使用fromNodeCallback的需要,有一个一stream的RxJS可观察
  5. 显然我有点困惑。

会爱一点帮助,谢谢!

这是我现有的代码:

var Rx = require('rx'); var port = require('../lib/port'); var async = require('async'); function observableReceive(portName) { var observerCallback; var listenPort = new port(portName); var disposed = false; var asyncReceive = function(asyncCallback) { listenPort.getNextMessage( function(error, json) { observerCallback(error, json); if (!disposed) setImmediate(asyncCallback); } ); } return function(outerCallback) { observerCallback = outerCallback; async.forever(asyncReceive); } } var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest')); var source = receive(); var subscription = source.forEach( function (json) { console.log('receive completed: ' + JSON.stringify(json)); }, function (error) { console.log("receive failed: " + error.toString()); }, function () { console.log('Completed'); subscription.dispose(); } ); 

所以这可能是我会做的。

 var Rx = require('Rx'); // This is just for kicks. You have your own getNextMessage to use. ;) var getNextMessage = (function(){ var i = 1; return function (callback) { setTimeout(function () { if (i > 10) { callback("lawdy lawd it's ova' ten, ya'll."); } else { callback(undefined, i++); } }, 5); }; }()); // This just makes an observable version of getNextMessage. var nextMessageAsObservable = Rx.Observable.create(function (o) { getNextMessage(function (err, val) { if (err) { o.onError(err); } else { o.onNext(val); o.onCompleted(); } }); }); // This repeats the call to getNextMessage as many times (11) as you want. // "take" will cancel the subscription after receiving 11 items. nextMessageAsObservable .repeat() .take(11) .subscribe( function (x) { console.log('next', x); }, function (err) { console.log('error', err); }, function () { console.log('done'); } ); 

我意识到这已经一年多了,但我认为更好的解决scheme是使用recursion调度:

 Rx.Observable.forever = function(next, scheduler) { scheduler = scheduler || Rx.Scheduler.default, //Internally wrap the the callback into an observable next = Rx.Observable.fromNodeCallback(next); return Rx.Observable.create(function(observer) { var disposable = new Rx.SingleAssignmentDisposable(), hasState = false; disposable.setDisposable(scheduler.scheduleRecursiveWithState(null, function(state, self) { hasState && observer.onNext(state); hasState = false; next().subscribe(function(x){ hasState = true; self(x); }, observer.onError.bind(observer)); })); return disposable; }); }; 

这里的想法是,你可以安排新的项目,一旦完成。 你调用next()来调用传入的方法,当它返回一个值时,你调度下一个调用项目。

你可以像这样使用它:

 Rx.Observable.forever(getNextMessage) .take(11) .subscribe(function(message) { console.log(message); }); 

在这里看到一个工作示例