从事件处理程序写入WriteStream
我有一个EventEmitter
对象,我设置了侦听事件。 当事件发射时,我想写信息到文件。 我有一个开放的FileStream
通过fs.createWriteStream(path, { flags: 'a'});
目前,我的问题是,如果我经常以超快的速度发射事件,我就开始“备份”了。 IE .write
返回false
要求我停止写一会儿。 由于我在事件处理程序中进行写操作,因此附近没有用于指示写入过程结束的callback函数。 我可以从处理或排放方面做什么来防止备份?
最终,这似乎并不重要; 所有的数据都被写入文件。 但是我想尽可能地遵循“规则”。
我知道我可以听取drain
事件,然后再开始写作,但是如何防止其他事件进入处理程序? 我注意到,如果我在每个发射之前放50ms延迟,备份似乎不会发生,但似乎有点像黑客。 另外如果你有一个较慢的硬盘?
以下是我的情况的一个例子:
var ee = new EventEmitter(); var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} ); ee.on('report', function (i) { stream.write('new file data ' + i + ' --- ' + Date.now + '\n'); }); for (var i = 0; i < 10000; ++i) { ee.emit('report', i) }
这不是确切的代码,但这是它的要点。 当一个正在运行的HTTP服务器发送响应时会发生完整的代码,但是如果我像1000个请求一样排队,通过一个for循环,我就会遇到上述情况。
我实际上最终find一个更简单的解决这个问题,使用读取和写入stream。 有关示例,请参阅下面的代码
var stream = require('stream'); var fs = require('fs'); var EventEmitter = require('events').EventEmitter; var ee = new EventEmitter(); var writeStream = fs.createWriteStream('./file/log.txt', { flags: 'a', end: false } ); var readStream = new stream.Readable(); // This needs to be here for compatibility reasons, but is intentionally a no-op readStream._read = function() {}; ee.on('report', function (i) { readStream.push(i.toString()); }); readStream.pipe(writeStream); for (var i = 0; i < 10000; ++i) { ee.emit('report', i); }
这将允许节点pipe道和stream系统与OS配合处理背压。 这是IMO这个问题的首选方法。
处理这个问题的理想方法是pause()
传入的事件,如果事件来自stream,或者以某种方式暂停,则可以执行这些事件,但这并不总是可能的。
如果不能以某种方式暂停传入的事件,那么我通常使用async
模块的queue
函数来处理这个事件。 当然还有很多其他的方法可以做到这一点,但是使用队列是我find的最简单的方法,而async
模块(这对于大量的asynchronous操作来说很好)提供了一个很好的方法。
基本的想法是把你所有的write
调用放到一个configuration为一次只能处理1个任务的队列中。 如果从stream.write
调用中返回false
,则pause()
queue
。 一旦从stream
获取drain
事件,就可以再次resume()
队列。 这样你就不会在stream
饱和的时候写信给stream
,但是你仍然可以接收事件,并在stream
准备好之后排队。
用你的例子代码做这件事情看起来像这样:
var async = require('async'); var ee = new EventEmitter(); var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} ); // Create a queue with a concurrency of 1 var writeQueue = async.queue(function(data, callback) { if (!stream.write(data)) { // if write() returns false, it's saturated; pause the queue writeQueue.pause(); } callback(); }, 1); // <-- concurrency argument here; it's easy to miss ;) stream.on('drain', function() { // the stream isn't saturated anymore; resume the queue writeQueue.resume(); }) ee.on('report', function (i) { // instead of writing directly to the stream, push data to the writeQueue writeQueue.push('new file data ' + i + ' --- ' + Date.now() + '\n'); }); for (var i = 0; i < 10000; ++i) { ee.emit('report', i) }
注意:这与从内部stream缓冲事物并不完全不同。 你仍然在缓冲数据,你只是自己做,这让你更好地控制情况。