rxjs在错误之后立即完成观察,而不是继续

我试图通过一个observable来传输一组内容,并在第一个错误之后停止。 把它看作一系列的项目是简单的,因为它的行为是一样的。

  1. 我正在创build一个项目的数组观察
  2. 将每个项目映射到一个URL
  3. 将URL称为请求承诺
  4. 执行一个catch(),在发生错误时返回一个observable.empty()

使用RxJS 5:

rx.Observable.from(array) .map(self.createUrl) .flatMap(x => { var options = { uri: url, headers: { "Content-Type": "application/json" }; return rx.Observable.fromPromise(request-promise(options)); }) .catch(() => { return rx.Observable.empty();}) .subscribe( x => console.log('success:', x), e => console.log('error'), () => console.log('complete')); 

在执行此序列时,遇到第一个错误后代码将停止。 我怀疑在#4中的空观察是终止观察,但我不知道为什么。

我期望的过程是无论错误如何处理数组中的所有项目 – 最终处理所有成功的项目,并在每个错误后恢复。

你只需要把catch()放在flatMap

 rx.Observable.from(array) .map(self.createUrl) .flatMap(x => { var options = { uri: url, headers: { "Content-Type": "application/json" } }; return rx.Observable .fromPromise(request-promise(options)) .catch(() => rx.Observable.empty()); }) .subscribe( x => console.log('success:', x), e => console.log('error'), () => console.log('complete')); 

现在,当内部Observable发出错误时,它将被立即捕获,并不会通过.flatMap()传播到主stream。