asynchronous等待或承诺不返回stream事件
我有以下函数返回一个Promise函数参数是一个asynchronous函数:
createObjectFrom(record) { const self = this; return new Promise(async (resolve, reject) => { let obj = {}; for(let i = 0, l = self.opts.transformers.length; i < l; i++) { let transformer = self.opts.transformers[i]; const headerIndex = findIndex(self.headers, (header) => { return header === transformer.column; }); let csvValue = record[headerIndex]; const lookUp = transformer.options.lookUp; const whereClause = {}; whereClause[lookUp.column] = csvValue; console.log('before await'); const result = await self.knex(lookUp.table).where(whereClause).select(lookUp.scalar); console.dir(result); obj[transformer.field] = result[0][lookUp.scalar]; } return resolve(obj); }); }
如果我在testing中调用这样的函数,它将全部正确执行:
it('creates the transformed object', async () => { const csvRecord = ['PREMIER', '07/11/1998', manager, 'Liverpool', 'Wimbledon', 0, 1, 'A', 0, 1, 'A']; const record = await transformer.createObjectFrom(csvRecord); expect(record.division).to.equal('PREMIER'); }
但是,在从csv-parse创build的stream中引发的可读事件中调用createObjectFrom
函数时:
onReadable() { let record = this.parser.read(); if (record === null) { return; } if (this.parser.count <= 1) { this.headers = record; } else { const recordPromises = this.createObjectFrom(record); this.recordPromises.push( newRecord ); } }
代码到达createObjectFrom
的console.log
语句
console.log('before here'); const result = await self.knex(lookUp.table).where(whereClause).select(lookUp.scalar); console.dir(result);
但是,由于Promise似乎没有解决,所以没有进入下面的console.dir
语句。
如果我从stream处理之外的testing中调用createObjectFrom
,那么它会正确parsing。
我也尝试重构asynchronous等待出来这只是返回一个承诺,但它仍然是坏的。
If I console.dir the promises on the [end][3] event of the stream they look like this: [ Promise { _bitField: 0, _fulfillmentHandler0: undefined, _rejectionHandler0: undefined, _promise0: undefined, _receiver0: undefined }, Promise { _bitField: 0, _fulfillmentHandler0: undefined, _rejectionHandler0: undefined, _promise0: undefined, _receiver0: undefined } ]
我有这个回购有源代码和一个失败的testing。
我很困惑,到底发生了什么。
下面的testing也通过了,所以肯定是与stream有关的东西:
it('creates the transformed object', async () => { const csvRecords = [ ['PREMIER', '07/11/1998', manager, 'Liverpool', 'Wimbledon', 0, 1, 'A', 0, 1, 'A'], ['PREMIER', '11/11/1998', manager, 'QPR', 'Sunderland',3,3, 'Sunderland',0,0,'Sunderland'], ['PREMIER', '14/11/1998', manager, 'Southampton', 'Liverpool', 3, 3, 'D', 0, 0, 'D'] ]; for(var i = 0, l = csvRecords.length; i < l; i++) { const csvRecord = csvRecords[i]; const record = await transformer.createObjectFrom(csvRecord); expect(record.division).to.equal('PREMIER'); expect(record.manager_id).to.equal(manager_id); } }
为什么你不改变你的代码是这样的:
createObjectFrom: async (record) => { const self = this; // return new Promise(async (resolve, reject) => { let obj = {}; for(let i = 0, l = self.opts.transformers.length; i < l; i++) { let transformer = self.opts.transformers[i]; const headerIndex = findIndex(self.headers, (header) => { return header === transformer.column; }); let csvValue = record[headerIndex]; const lookUp = transformer.options.lookUp; const whereClause = {}; whereClause[lookUp.column] = csvValue; console.log('before await'); const result = await self.knex(lookUp.table).where(whereClause).select(lookUp.scalar); console.dir(result); obj[transformer.field] = result[0][lookUp.scalar]; } return obj; // return resolve(obj); // }); }
OMG,这是一个屁股,那是因为我没有从testing中返回诺言。 在使用摩卡testing承诺时,您需要从该函数返回承诺。 如果你这样做,这一切工作。 真是个蠢才!
所以如果我改变这个testing:
describe('transformer', () => { it('transforms the data and imports the csv file', () => { const ignoreIf = (data) => data[3] !== 'Liverpool' && data[4] !== 'Liverpool'; const opts = { table: 'results', file: __dirname + '/fixtures/test.csv', encoding: 'utf8', transformers, ignoreIf: ignoreIf }; const f = seeder(opts)(knex, Promise); return f.then((results) => { console.dir(results); }); });
我return f
,现在一切都很好。