将Observable实现到持久队列库中

目前正在编写一个小的持久队列库,将读/写行到一个文本文件。 这里是add方法,例如:

Queue.prototype.add = function(line, cb){ getLock(this, err => { if(err){ this.emit('error', err); releaseLock(err, cb); } else{ fs.appendFile(this.filepath, line, err => { err && this.emit('error', err); releaseLock(err, cb); }); } }); }; 

我觉得很尴尬,支持事件发射器和callback(或事件发射器和承诺)。

换句话说,对于队列中的每个方法(add,peek,remove),我需要返回/callback每个调用所特定的结果。 仅使用事件发射器意味着调用者可能会针对他们刚才所做的调用不是特定的结果。 所以callback或承诺似乎是必要的 – 你不能只使用事件发射器。

我想知道的是 – 可以观察到不知何故解决必须与事件发射器或事件发射器的承诺配对callback的问题?

我正在寻找一种方法来实现这种只有一种typesasynchronouscallback机制的evented /asynchronous队列。 也许observables不是这里的答案,但我正在寻找一个好的devise模式。

我不太清楚为什么你需要在这里的事件发射器….如果你使用observables每个用户将从他们自己的电话获得结果/错误。

我会重写你的方法是这样的:

 function appendFileObs(filePath, line){ return Rx.Observable.create((obs) => { fs.appendFile(filePath, line, (err, result) => { if(err) obs.onError(err); else { obs.onNext(result); obs.onCompleted(); } }); }); }); // Similar for getLock and releaseLock Queue.prototype.add = function(line){ return getLockObs(this) .flatMap(() => appendFileObs(this.filePath, line)) .flatMap(result => releaseLockObs(undefined).map(() => result)) .catch((err) => { return releaseLockObs(err); }); }; 

在这个解决scheme中,我并不自豪,这个stream有内部的副作用,这可能是可以改进的,但你明白了。

这样,当有人调用.add(line).subscribe()时,它会得到结果以及在他的调用中发生的错误。

如果您需要广播发生的错误,您可以使用BehaviourSubject,这是一个观察者,并在同一时间观察(有用的东西!)