asynchronous等待或承诺不返回stream事件

我有以下函数返回一个Promise函数参数是一个asynchronous函数:

createObjectFrom(record) { const self = this; return new Promise(async (resolve, reject) => { let obj = {}; for(let i = 0, l = self.opts.transformers.length; i < l; i++) { let transformer = self.opts.transformers[i]; const headerIndex = findIndex(self.headers, (header) => { return header === transformer.column; }); let csvValue = record[headerIndex]; const lookUp = transformer.options.lookUp; const whereClause = {}; whereClause[lookUp.column] = csvValue; console.log('before await'); const result = await self.knex(lookUp.table).where(whereClause).select(lookUp.scalar); console.dir(result); obj[transformer.field] = result[0][lookUp.scalar]; } return resolve(obj); }); } 

如果我在testing中调用这样的函数,它将全部正确执行:

  it('creates the transformed object', async () => { const csvRecord = ['PREMIER', '07/11/1998', manager, 'Liverpool', 'Wimbledon', 0, 1, 'A', 0, 1, 'A']; const record = await transformer.createObjectFrom(csvRecord); expect(record.division).to.equal('PREMIER'); } 

但是,在从csv-parse创build的stream中引发的可读事件中调用createObjectFrom函数时:

  onReadable() { let record = this.parser.read(); if (record === null) { return; } if (this.parser.count <= 1) { this.headers = record; } else { const recordPromises = this.createObjectFrom(record); this.recordPromises.push( newRecord ); } } 

代码到达createObjectFromconsole.log语句

  console.log('before here'); const result = await self.knex(lookUp.table).where(whereClause).select(lookUp.scalar); console.dir(result); 

但是,由于Promise似乎没有解决,所以没有进入下面的console.dir语句。

如果我从stream处理之外的testing中调用createObjectFrom ,那么它会正确parsing。

我也尝试重构asynchronous等待出来这只是返回一个承诺,但它仍然是坏的。

 If I console.dir the promises on the [end][3] event of the stream they look like this: [ Promise { _bitField: 0, _fulfillmentHandler0: undefined, _rejectionHandler0: undefined, _promise0: undefined, _receiver0: undefined }, Promise { _bitField: 0, _fulfillmentHandler0: undefined, _rejectionHandler0: undefined, _promise0: undefined, _receiver0: undefined } ] 

我有这个回购有源代码和一个失败的testing。

我很困惑,到底发生了什么。

下面的testing也通过了,所以肯定是与stream有关的东西:

  it('creates the transformed object', async () => { const csvRecords = [ ['PREMIER', '07/11/1998', manager, 'Liverpool', 'Wimbledon', 0, 1, 'A', 0, 1, 'A'], ['PREMIER', '11/11/1998', manager, 'QPR', 'Sunderland',3,3, 'Sunderland',0,0,'Sunderland'], ['PREMIER', '14/11/1998', manager, 'Southampton', 'Liverpool', 3, 3, 'D', 0, 0, 'D'] ]; for(var i = 0, l = csvRecords.length; i < l; i++) { const csvRecord = csvRecords[i]; const record = await transformer.createObjectFrom(csvRecord); expect(record.division).to.equal('PREMIER'); expect(record.manager_id).to.equal(manager_id); } } 

为什么你不改变你的代码是这样的:

 createObjectFrom: async (record) => { const self = this; // return new Promise(async (resolve, reject) => { let obj = {}; for(let i = 0, l = self.opts.transformers.length; i < l; i++) { let transformer = self.opts.transformers[i]; const headerIndex = findIndex(self.headers, (header) => { return header === transformer.column; }); let csvValue = record[headerIndex]; const lookUp = transformer.options.lookUp; const whereClause = {}; whereClause[lookUp.column] = csvValue; console.log('before await'); const result = await self.knex(lookUp.table).where(whereClause).select(lookUp.scalar); console.dir(result); obj[transformer.field] = result[0][lookUp.scalar]; } return obj; // return resolve(obj); // }); } 

OMG,这是一个屁股,那是因为我没有从testing中返回诺言。 在使用摩卡testing承诺时,您需要从该函数返回承诺。 如果你这样做,这一切工作。 真是个蠢才!

所以如果我改变这个testing:

  describe('transformer', () => { it('transforms the data and imports the csv file', () => { const ignoreIf = (data) => data[3] !== 'Liverpool' && data[4] !== 'Liverpool'; const opts = { table: 'results', file: __dirname + '/fixtures/test.csv', encoding: 'utf8', transformers, ignoreIf: ignoreIf }; const f = seeder(opts)(knex, Promise); return f.then((results) => { console.dir(results); }); }); 

return f ,现在一切都很好。