如何链接之前在forkjoin()操作中的每个observable的组

我在MySQL中有一个订单表,每个订单都有一些关联的文档,不pipe它们是报价单,发票等等。因此有第二个称为“documents”的表,它具有“document_id”主键和“ order_id“外键; 以类似的方式,我还有另一个技术人员对每辆车进行检查的情况,然后是另一张车辆图片。 我正在创build一个使用Node和Express的Web服务,需要返回类似于这个的Json …

[ { "order_id": 1003, "customer_id": 8000, "csi": 90, "date_admitted": "2016-10-28T05:00:00.000Z", "plates": "YZG-5125", ... documents: { "type": "invoice", "number": "1234", ... }, checks: { "scanner": "good", "battery": "average", ... }, vehicle_pictures: { "title": "a title...", "path": "the file path" ... } }, { ... }, ... ] 

正如你所看到的,有必要为每个订单做三个查询,一个用于检查,另一个用于文档,第三个用于图片,然后我需要将这些子结果添加到最终返回数组中的顺序。

在旧版的同步编程中这将是一件非常容易的事情,但是由于mysql库的连接对象中的query()方法的asynchronous性质,这个威胁变成了一个真正的地狱。

在需要处理单个订单的情况下,使用forkJoin()在服务器上使用RxJS库就足以一次处理所有三个结果,但我不确定如何“链接”每个订单(使用forkJoin来pipe理3个查询),所以一切都得到处理,最后我可以调用res.json(结果),一切整齐。

注意:我想用RxJS来解决这个问题,而不是使用像node-mysql-libmysqlclient这样的同步库软件包。 原因基本上是像NodeJS那样在asynchronous语言中这样做的“正确”方式是asynchronous的。 此外,我想使用RxJS而不是asynchronous,Q承诺或任何其他库,因为Observables似乎是asynchronous解决scheme竞赛中的绝对赢家,也希望在开发的所有解决scheme中保持一致,所以这个问题主要针对RxJS大师。

我发现的每一个问题都与传统的“纯粹主义”的答案一样,如果你使用Node,你应该“使用asynchronous”,而不要在同步解决scheme中思考。 所以对于那些捍卫这个位置的人来说,这是一个挑战,因为这个(我认为)是Node中同步的情况之一,但是我真的想学习如何与RxJS做到这一点,而不是认为这是不可能的,我相信这不是。

如果我正确地理解了这些事情,则可以使用一些数据来通过asynchronous操作从数据库中收集更多数据。 您要构build由原始数据和后续查询返回的附加信息组成的组合数据集。

正如你所提到的,你可以在继续之前使用forkJoin来等待多个操作完成。 您必须为数据序列中的每个项目执行此操作,然后使用switchMap将结果合并回原始stream。

看看下面的示例jsbin ,演示如何做到这一点:

 const data = [ { id: 1, init: 'a' }, { id: 2, init: 'b' }, { id: 3, init: 'c' } ] function getA(id) { return Rx.Observable.timer(1000) .map(() => { return { id, a: 'abc' } }) .toPromise(); } function getB(id) { return Rx.Observable.timer(1500) .map(() => { return { id, b: 'def' } }) .toPromise(); } Rx.Observable.interval(5000) .take(data.length) .map(id => data[id]) .do(data => { console.log(`query id ${data.id}`)}) .switchMap((data) => { return Rx.Observable.forkJoin(getA(data.id), getB(data.id), (a, b) => { console.log(`got results for id ${data.id}`); return Object.assign({}, data, a, b); }); }) .subscribe(x => console.log(x));