在Node.js中将三个不同的函数映射到Observable

我是Rxjs的新手。 如果可能,我想遵循最佳做法。

我正在执行三个不同的函数在可观察的返回相同的数据。 在“数据stream”概念之后,我一直认为我需要将这个可观察对象分成三个stream,并继续进行。

这是我的代码,所以我可以停止抽象地说话:

// NotEmptyResponse splits the stream in 2 to account based on whether I get an empty observable back. let base_subscription = RxNode.fromStream(siteStream).partition(NotEmptyResponse); // Success Stream to perform further actions upon. let successStream = base_subscription[0]; // The Empty stream for error reporting let failureStream = base_subscription[1]; //Code works up until this point. I don't know how to split to 3 different streams. successStream.filter(isSite) .map(grabData)// Async action that returns data /*** Perform 3 separate actions upon data that .map(grabData) returned **/ .subscribe(); 

我怎样才能将这个数据stream分成三个,并将每个数据实例映射到不同的函数?

事实上, partition()运算符在内部只是调用filter()运算符两次 。 首先从与predicate匹配的值创buildObservable,然后为与predicate不匹配的值创build一个Observable。

所以你可以用filter()运算符做同样的事情:

 let obs1 = base_subscription.filter(val => predicate1); let obs2 = base_subscription.filter(val => predicate2); let obs3 = base_subscription.filter(val => predicate3); 

现在你有三个可观测量,每个都只能发射一些特定的值。 那么你可以继续你现有的代码:

 obs2.filter(isSite) .map(grabData) .subscribe(); 

请注意,调用subscribe()会触发源Observable的生成值。 这不一定总是这样,取决于你使用的Observable。 请参阅文档中的“热”和“冷”观察点 。 运算符connect()可能对您有用,具体取决于您的用例。