RxJS节点等待观察完成,然后按顺序订阅下一个

我试图填充我的数据库从谷歌表格中刮取的值。 hasuraHelper.insert()向我的数据库发送一个HTTP邮件。 我怎样才能使一个insert()只在前面的api调用返回之后被调用? 我认为concatMap可以做到这一点,但似乎仍然热切地赞同所有的排放。

当前:请求1 – >请求2 – > … – >请求1完成 – >请求2完成 – > …

我想要的:请求1 – >请求1完成 – >请求2 – >请求2完成 – > …

这是相关的代码:

sheetsHelper.authToken() .flatMap(sheetsHelper.get) .flatMap(response => response.values) //values is a 2d array .map(row => { console.log(row[0]); const flights = []; //add 100-500 objects to this array return flights; }) .flatMap(flights => { const arrayOfFlightArrays = []; //max batch size of 50 while (flights.length) { const flightArr = flights.splice(0, 50); arrayOfFlightArrays.push(flightArr); } return Rx.Observable.from(arrayOfFlightArrays); }) .concatMap(flights => hasuraHelper.insert(flights) //insert flights into table .retry()) .map(response => response.data) .subscribe(console.log, console.error); 

这是插入的样子:

 exports.insert = function (objects) { return Rx.Observable.fromPromise(axios({ method: 'post', url: '/', data: { type: "insert", args: { table: "flights", objects: objects } } })); }; 

你的插入function不是懒惰的。 你正在执行一个Observable的承诺,但它已经在执行了。

为了得到一个懒惰的承诺,并开始处理订阅时,你需要把它包装在Rx.Observable.defer(() => Rx.Observable.fromPromise(axios(/* ... */))