如何将RxJs observable的结果作为来自Node.js的rest响应返回

场景:来自多个rest调用的数据必须汇总为一个单独的对象,并作为通过Node.js服务的初始请求的其余响应返回。

问题:其余响应不等待observable完成,因此在调度剩余响应之后实现突变(聚合)。

//teamsController class invoked via GET /teams import * as Rx from 'rxjs/Rx' import http from 'axios' import Teams from '../models/teams' const teamsAPI = "http://localhost:8081/api/v1/teams/players/"; const usersAPI = "http://localhost:8082/api/v1/users/"; exports.getTeamByPlayer = function (req, res) { let username= req.get("username"); Rx.Observable.fromPromise(fetchTeam(username)) .map(team => { Rx.Observable.from(team.players).subscribe(player => { console.log(`Player name is ${player.username}`); Rx.Observable.fromPromise(fetchUser(player.username)) .map(avatar => avatar.avatar) .subscribe(avatar => { player.avatar = avatar; console.log(player) }) }); return team; }) .subscribe(result => { console.log(`result is ${JSON.stingify(result)}`); res.json(result); }) } /** * Fetch a team by a player * * @param name The name of the team * @returns {Promise.<Teams>} */ function fetchTeam(name) { return http.get(teamsAPI + name) .then(response => new Teams(response.data.data)) .catch(error => { throw new Error("todo: fill error message"); }) } /** * Fetch a user given its username * * @param username The username of the player * @returns {Promise.<TResult>} */ function fetchUser(username) { return new Promise(function (resolve, reject) { console.log(`fetching user: ${username}`); resolve(); }).then(() => { return { "avatar": { "flagColor": "dummyValue", "flagCrest": "dummyValue" } } }); 

logging结果:

 Player name is username1 fetching user: username1 Player name is username2 fetching user: username2 result is {"id":"5a1c2a4030c39e5d88aed087","name":null,"avatar":{"flagColor":"abcdefg","flagCrest":"hijklmn"},"description":"string","motto":"string","players":[{"userId":"59b94a7b8b68ef0a048e85c1","username":"username1","status":"ACTIVE","role":"ADMIN","dateJoined":1511795264314,"dateInvited":null,"score":0},{"userId":"59b94a7b8b68ef0a048e85c1","username":"username2","status":"ACTIVE","role":"MEMBER","dateJoined":1511795264314,"dateInvited":null,"score":0}],"score":0,"type":"TEAM","open":true,"location":{"longitude":0,"latitude":0,"country":"string"},"owner":"username1","dateCreated":1511795264314} { userId: '59b94a7b8b68ef0a048e85c1', username: 'username1', status: 'ACTIVE', role: 'ADMIN', dateJoined: 1511795264314, dateInvited: null, score: 0, avatar: { flagColor: 'dummyValue', flagCrest: 'dummyValue' } } { userId: '59b94a7b8b68ef0a048e85c1', username: 'username2', status: 'ACTIVE', role: 'MEMBER', dateJoined: 1511795264314, dateInvited: null, score: 0, avatar: { flagColor: 'dummyValue', flagCrest: 'dummyValue' } } 

AppServer:Node.JS v8.7.0,中间件:4.16.2,Libs:RxJs 5.0.0-beta.12,axios 0.17.1

首先,停止将您的http请求转换为承诺。 它使事情变得比他们需要的复杂。

 function fetchTeam(name) { return http.get(teamsAPI + name).map(res => new Teams(res.json().data)); } function fetchUser(username) { return Rx.Observable.of({ "avatar": { "flagColor": "dummyValue", "flagCrest": "dummyValue" } }); } 

如果一些外部消费者需要基于api的承诺,那么在公开函数中包含这些私有函数的承诺。

其次,map是一个同步操作符,你不能在里面执行一个asynchronous操作,你需要使用一个asynchronous操作符来为你订阅。 如果你发现自己在可观察的stream中订阅,你做错了什么。

 let username= req.get("username"); fetchTeam(username) .switchMap(team => { // switchMap will subscribe to inner observables //build observables for fetching each avatar, pay close attention, I'm mixing the array map operator and the rx map operator in here let players$ = team.players.map(player => fetchUser(player.username).map(avatar => avatar.avatar)); return Rx.Observable.forkJoin(players$); // use forkjoin to executre requests }, (team, playerAvatars) => { // use second switchMap argument to combine results team.players.forEach((player, idx) => player.avatar = playerAvatars[idx]); return team; }) .subscribe(result => { console.log(`result is ${JSON.stingify(result)}`); res.json(result); });