蓝鸟承诺与事件发射器

我对使用蓝鸟承诺是相当新的。 我试图在发射器上使用它们。 但是,我被困在如何处理错误。

我有一个stream对象是发射器。 代码如下 –

 return new Promise((resolve, reject) => { var onDocFunc = doc => { //JSON.parse('*'); // some logic goes in here to construct setStmt bulk.find(query).upsert().update({$set: setStmt}); count++; if (count % bulkSize == 0) { stream.pause(); var execute = Promise.promisify(bulk.execute); execute().catch(() => {}).then(() => { stream.resume(); }); } }; stream.on('data', onDocFunc); stream.on('end', () => { JSON.parse('*'); // how to catch errors that happen here?? var boundResolve = resolve.bind(this, {count: count}); if (count % bulkSize != 0) { Promise.promisify(bulk.execute)().then(boundResolve).catch(boundResolve); } else { boundResolve(); } }); stream.on('error', err => { reject(err); }); }) 

我想知道什么是推荐的方法来捕获在end事件处理程序的callback内发生的错误? 现在,如果发生任何错误,NodeJS应用程序崩溃与uncaughtException: Unexpected token *

不要将应用程序逻辑混合到事件发射器的promisification中。 这样的代码(可以抛出等)应该总是进入callback。 在你的情况下:

 var execute = Promise.promisify(bulk.execute); return new Promise((resolve, reject) => { stream.on('data', onDocFunc); // not sure what this does stream.on('end', resolve); stream.on('error', reject); }).then(() => { JSON.parse('*'); // exceptions that happen here are caught implicitly! var result = {count: count}; if (count % bulkSize != 0) { return execute().catch(()=>{}).return(result); } else { return result; } }); 

关于你真实的代码,我可能会尝试把批处理分解成一个辅助函数:

 function asyncBatch(stream, size, callback) { var batch = [], count = 0; stream.on('data', data => { batch.push(data); count++; if (batch.length == size) { stream.pause(); Promise.resolve(batch).then(callback).then(() => { batch = []; stream.resume(); }, e => { stream.emit('error', e); }); } }); return new Promise((resolve, reject) => { stream.on('end', resolve); stream.on('error', reject); }).then(() => batch.length ? callback(batch) : null).then(() => count); } Promise.promisifyAll(Bulk); return asyncBatch(stream, bulkSize, docs => { const bulk = new Bulk() for (const doc of docs) { // JSON.parse('*'); // some logic goes in here to construct setStmt bulk.find(query).upsert().update({$set: setStmt}); } return bulk.executeAsync().catch(err => {/* ignore */}); }) 

你将不得不使用try / catch块:

 stream.on('end', () => { try { JSON.parse('*') // ...the rest of your code } catch (e) { reject(e) } })