在Node.js中将三个不同的函数映射到Observable
我是Rxjs的新手。 如果可能,我想遵循最佳做法。
我正在执行三个不同的函数在可观察的返回相同的数据。 在“数据stream”概念之后,我一直认为我需要将这个可观察对象分成三个stream,并继续进行。
这是我的代码,所以我可以停止抽象地说话:
// NotEmptyResponse splits the stream in 2 to account based on whether I get an empty observable back. let base_subscription = RxNode.fromStream(siteStream).partition(NotEmptyResponse); // Success Stream to perform further actions upon. let successStream = base_subscription[0]; // The Empty stream for error reporting let failureStream = base_subscription[1]; //Code works up until this point. I don't know how to split to 3 different streams. successStream.filter(isSite) .map(grabData)// Async action that returns data /*** Perform 3 separate actions upon data that .map(grabData) returned **/ .subscribe();
我怎样才能将这个数据stream分成三个,并将每个数据实例映射到不同的函数?
事实上, partition()
运算符在内部只是调用filter()
运算符两次 。 首先从与predicate
匹配的值创buildObservable,然后为与predicate
不匹配的值创build一个Observable。
所以你可以用filter()运算符做同样的事情:
let obs1 = base_subscription.filter(val => predicate1); let obs2 = base_subscription.filter(val => predicate2); let obs3 = base_subscription.filter(val => predicate3);
现在你有三个可观测量,每个都只能发射一些特定的值。 那么你可以继续你现有的代码:
obs2.filter(isSite) .map(grabData) .subscribe();
请注意,调用subscribe()
会触发源Observable的生成值。 这不一定总是这样,取决于你使用的Observable。 请参阅文档中的“热”和“冷”观察点 。 运算符connect()
可能对您有用,具体取决于您的用例。