RxJS – 使用zip并行化使HTTP请求不工作的延迟Observable – 您提供了一个无效的对象,其中stream是预期的

我正在运行一个Node.js API端点,它必须在它的一个端点中对几个HTTP调用进行并列。

以星球大战API为例。 我通过id获取用户信息。 它包含一系列的电影,我想并行检索所有这些电影信息。

得到人的信息后,我创build了一个观察数组,每个人得到一个电影。 我build立了一个zip操作员来组成所有得到的电影的结果。

关键是,在订阅后,我看到n次,每个电影一个, console.log(filmData)信息,这是正确的。 但它似乎与console.log("*********************************************************************"); 不叫。 下一个,错误,完整的callback都不是。

为什么会这样?

 client.get("https://swapi.co/api/people/"+uid, args, (data, response) => { var filmsIt = data.films; for(var i in filmsIt){ var observable = Rx.Observable.defer(function () { client.get(filmsIt[i], args, (filmData, filmResponse) => { console.log(filmData); return filmData; }); }); observables.push(observable); } var observableFinal = Rx.Observable.zip(...observables, function() { console.log("*********************************************************************"); }).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }); 

更新:马丁的答案,callback被调用。 但是,我现在得到这个错误:

 Error: TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable. 

我已经改变了延迟函数返回一个Observable像这样:

 var observable = Rx.Observable.defer(function () { return Rx.Observable.of(client.get(filmsIt[i], args, (filmData, filmResponse) => { console.log(filmData); return filmData; }); })); 

问题是,现在zip函数没有获得HTTP调用的实际值,而是一堆ClientRequest对象:

  { '0': ClientRequest { domain: null, _events: {}, _eventsCount: 0, _maxListeners: undefined, href: 'https://swapi.co/api/films/7/', options: [Object], _httpRequest: [Object] }, '1': ClientRequest { domain: null, _events: {}, _eventsCount: 0, _maxListeners: undefined, href: 'https://swapi.co/api/films/7/', options: [Object], _httpRequest: [Object] }, ... 

当使用Observable.defer您需要从callback函数(或Promise或其他)中返回Observable,请参阅http://reactivex.io/rxjs/class/es6/MiscJSDoc.js~ObservableInputDoc.html )。

我想你想做这样的事情:

 var observable = Rx.Observable.defer(function () { return client.get(filmsIt[i], args, (filmData, filmResponse) => { console.log(filmData); return filmData; }); }); 

马丁在回答关于需要从延期函数中返回一个有效types的答案时是正确的。

为了清楚起见,我使用Q来解决这个问题,以包装http客户端库,并以这种方式返回一个承诺:

 function httpGet(url, args){ var deferred = Q.defer(); client.get(url, args, (data, response) => { deferred.resolve(data); }); return deferred.promise; } exports.getFilms = (uid) => { var deferred = Q.defer(); var observables = []; var toReturn; httpGet("https://swapi.co/api/people/"+uid, args) .then( (data) => { toReturn = data; data.films.forEach(function (filmsIt){ var observable = Rx.Observable.fromPromise(httpGet(filmsIt, args)); observables.push(observable); }); var observableFinal = Rx.Observable.zip(...observables, function() { toReturn.films = arguments; return arguments; }).subscribe( (x) => {}, (err) => {console.log('Error: ' + err);}, () => { deferred.resolve(toReturn); }); }); return deferred.promise; }