使用Node.js实时读取文件

我需要制定出最好的方法来实时读取正在写入文件的数据,使用node.js。 麻烦的是,Node是一个快速移动的船只,它使寻找解决困难问题的最佳方法成为可能。

我想做的事
我有一个java进程正在做一些事情,然后把这个东西的结果写到一个文本文件中。 它通常需要5分钟到5小时的任何时间运行,数据一直写入,并且可以达到一些相当高的吞吐速率(大约1000行/秒)。

我想实时读取这个文件,然后使用节点聚合数据,并将其写入一个套接字,在套接字上可以绘制在客户端上。

客户端,图表,套接字和聚集逻辑都已经完成,但是我对读取文件的最佳方法感到困惑。

我所尝试过的(或至less玩过)
FIFO – 我可以告诉我的Java进程写入一个fifo,并使用节点读取,实际上我们是如何使用Perl来实现这个function的,但是因为一切都在节点上运行,所以移植代码是有意义的。

Unix Sockets – 如上所述。

fs.watchFile – 这将工作,我们需要什么?

fs.createReadStream – 这比fs.createReadStream更好吗?

fstail -f – 看起来像一个黑客。

其实是我的问题
我倾向于使用Unix套接字,这似乎是最快的select。 但节点有更好的内置function实时读取文件从FS?

如果您希望将文件保存为数据的持久存储以防止在系统崩溃或正在运行的进程networking中的某个成员死亡的情况下丢失stream,则仍然可以继续写入文件并阅读从中。

如果你不需要这个文件作为你的Java进程产生结果的持久存储,那么使用Unix套接字对于易用性和性能来说都是好的。

fs.watchFile()不是你所需要的,因为它在文件统计信息上工作,因为文件系统报告它,并且由于你想要读取已经被写入的文件,所以这不是你想要的。

SHORT UPDATE:我非常抱歉意识到,尽pipe我在前面的段落中使用了fs.watchFile()来使用文件统计信息,但是我在下面的示例代码中自己也做了同样的事情! 虽然我已经告诫读者“保重!” 因为我在几分钟内写完了,甚至没有经过很好的testing。 但是,如果底层系统支持,则可以使用fs.watch()而不是fstatSyncfstatSync来完成。

对于从文件中读取/写入,我在下面写了rest时的乐趣:

test-fs-writer.js :[你不需要这个,因为你在Java过程中写文件]

 var fs = require('fs'), lineno=0; var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); stream.on('open', function() { console.log('Stream opened, will start writing in 2 secs'); setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); }); 

test-fs-reader.js :[小心,这只是演示,检查错误的对象!]

 var fs = require('fs'), bite_size = 256, readbytes = 0, file; fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); function readsome() { var stats = fs.fstatSync(file); // yes sometimes async does not make sense! if(stats.size<readbytes+1) { console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); setTimeout(readsome, 3000); } else { fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); } } function processsome(err, bytecount, buff) { console.log('Read', bytecount, 'and will process it now.'); // Here we will process our incoming data: // Do whatever you need. Just be careful about not using beyond the bytecount in buff. console.log(buff.toString('utf-8', 0, bytecount)); // So we continue reading from where we left: readbytes+=bytecount; process.nextTick(readsome); } 

您可以安全地避免使用nextTick ,而是直接调用readsome() 。 由于我们仍然在这里同步工作,所以没有任何必要。 我只是喜欢它。 :p

奥利弗·劳埃德编辑

以上面的例子,但扩展它来读取CSV数据给出:

 var lastLineFeed, lineArray; function processsome(err, bytecount, buff) { lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); if(lastLineFeed > -1){ // Split the buffer by line lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); // Then split each line by comma for(i=0;i<lineArray.length;i++){ // Add read rows to an array for use elsewhere valueArray.push(lineArray[i].split(',')); } // Set a new position to read from readbytes+=lastLineFeed+1; } else { // No complete lines were read readbytes+=bytecount; } process.nextTick(readFile); } 

你为什么觉得tail -f是黑客?

虽然搞清楚我find了一个很好的例子,我会做类似的事情。 使用node.js和WebSocket的实时在线活动监视器示例:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

为了完成这个答案,我给你写了一个可以在0.8.0下运行的例子代码 – (http服务器也许是一个黑客)。

一个subprocess生成尾部运行,并且由于一个subprocess是一个带有三个stream的EventEmitter(我们在我们的例子中使用stdout),你可以添加一个监听器

文件名: tailServer.js

用法: node tailServer /var/log/filename.log

 var http = require("http"); var filename = process.argv[2]; if (!filename) return console.log("Usage: node tailServer filename"); var spawn = require('child_process').spawn; var tail = spawn('tail', ['-f', filename]); http.createServer(function (request, response) { console.log('request starting...'); response.writeHead(200, {'Content-Type': 'text/plain' }); tail.stdout.on('data', function (data) { response.write('' + data); }); }).listen(8088); console.log('Server running at http://127.0.0.1:8088/'); 

这个模块是@hasanyasinbuild议的一个原则的实现:

https://github.com/felixge/node-growing-file

我从@hasanyasin那里得到答案,并把它包装成一个模块化的承诺。 基本的想法是你传递一个文件和一个处理函数来处理从文件中读取的string化的缓冲区。 如果处理函数返回true,那么文件将不会被读取。 你也可以设置一个超时,如果处理程序不能足够快地返回true,将会终止读取。

如果由于超时而调用了resolve(),promiser将返回true,否则返回false。

查看使用示例的底部。

 // https://stackoverflow.com/a/11233045 var fs = require('fs'); var Promise = require('promise'); class liveReaderPromiseMe { constructor(file, buffStringHandler, opts) { /* var opts = { starting_position: 0, byte_size: 256, check_for_bytes_every_ms: 3000, no_handler_resolution_timeout_ms: null }; */ if (file == null) { throw new Error("file arg must be present"); } else { this.file = file; } if (buffStringHandler == null) { throw new Error("buffStringHandler arg must be present"); } else { this.buffStringHandler = buffStringHandler; } if (opts == null) { opts = {}; } if (opts.starting_position == null) { this.current_position = 0; } else { this.current_position = opts.starting_position; } if (opts.byte_size == null) { this.byte_size = 256; } else { this.byte_size = opts.byte_size; } if (opts.check_for_bytes_every_ms == null) { this.check_for_bytes_every_ms = 3000; } else { this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; } if (opts.no_handler_resolution_timeout_ms == null) { this.no_handler_resolution_timeout_ms = null; } else { this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; } } startHandlerTimeout() { if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { var that = this; this._handlerTimer = setTimeout( function() { that._is_handler_timed_out = true; }, this.no_handler_resolution_timeout_ms ); } } clearHandlerTimeout() { if (this._handlerTimer != null) { clearTimeout(this._handlerTimer); this._handlerTimer = null; } this._is_handler_timed_out = false; } isHandlerTimedOut() { return !!this._is_handler_timed_out; } fsReadCallback(err, bytecount, buff) { try { if (err) { throw err; } else { this.current_position += bytecount; var buff_str = buff.toString('utf-8', 0, bytecount); var that = this; Promise.resolve().then(function() { return that.buffStringHandler(buff_str); }).then(function(is_handler_resolved) { if (is_handler_resolved) { that.resolve(false); } else { process.nextTick(that.doReading.bind(that)); } }).catch(function(err) { that.reject(err); }); } } catch(err) { this.reject(err); } } fsRead(bytecount) { fs.read( this.file, new Buffer(bytecount), 0, bytecount, this.current_position, this.fsReadCallback.bind(this) ); } doReading() { if (this.isHandlerTimedOut()) { return this.resolve(true); } var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; if (max_next_bytes) { this.fsRead( (this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size ); } else { setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); } } promiser() { var that = this; return new Promise(function(resolve, reject) { that.resolve = resolve; that.reject = reject; that.doReading(); that.startHandlerTimeout(); }).then(function(was_resolved_by_timeout) { that.clearHandlerTimeout(); return was_resolved_by_timeout; }); } } module.exports = function(file, buffStringHandler, opts) { try { var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); return live_reader.promiser(); } catch(err) { return Promise.reject(err); } }; 

然后使用这样的代码:

 var fs = require('fs'); var path = require('path'); var Promise = require('promise'); var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); var ending_str = '_THIS_IS_THE_END_'; var test_path = path.join('E:/tmp/test.txt'); var s_list = []; var buffStringHandler = function(s) { s_list.push(s); var tmp = s_list.join(''); if (-1 !== tmp.indexOf(ending_str)) { // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true return true; // you can also return a promise: // return Promise.resolve().then(function() { return true; } ); } }; var appender = fs.openSync(test_path, 'a'); try { var reader = fs.openSync(test_path, 'r'); try { var options = { starting_position: 0, byte_size: 256, check_for_bytes_every_ms: 3000, no_handler_resolution_timeout_ms: 10000, }; liveReadAppendingFilePromiser(reader, buffStringHandler, options) .then(function(did_reader_time_out) { console.log('reader timed out: ', did_reader_time_out); console.log(s_list.join('')); }).catch(function(err) { console.error('bad stuff: ', err); }).then(function() { fs.closeSync(appender); fs.closeSync(reader); }); fs.write(appender, '\ncheck it out, I am a string'); fs.write(appender, '\nwho killed kenny'); //fs.write(appender, ending_str); } catch(err) { fs.closeSync(reader); console.log('err1'); throw err; } } catch(err) { fs.closeSync(appender); console.log('err2'); throw err; }