如何asynchronous写入和读取节点中的同一个文件?
我有数据通过websocket进来。 它发送20ms的二进制数据块。 我需要连接每个块,以便后端进程可以读取连续stream的数据。
//Create the file and append binary as it comes in tmp.file({postfix: '.raw' },function (err, path, fd, cleanup) { if (err) throw err; newPath = path fs.appendFile(newPath, new Buffer(binary), (err) => { if (err) throw err; }) }) //Read the file as it is written fs.createReadStream(newPath).pipe(recStream);
现在,我只是在createReadStream
上有一个简单的半秒延迟来确保文件中有数据。
这当然不正确,不工作。 什么是正确的方式去呢?
在这种情况下,最好的办法是告诉服务器,你正在接收数据,暂停,直到你准备好处理更多( drain
)。 假设这不是你的select:
首先将传入数据写入目标stream。 如果write(chunk)
返回false
,则表示stream的内部缓冲区已满; 是时候开始将后续数据caching到磁盘了。 (刚刚写入的数据chunk
会导致返回值false
,请不要将其写入磁盘 – false
并不表示写入失败,这只是缓冲区比highWaterMark
数据更多的信号。
在一个临时文件夹中,创build一个新文件( A )写入stream,并向其写入下一个传入数据块。 做到这一点,直到您的目的地stream发出drain
事件。
当你的目的地drain
:
- 交换缓冲区文件。 closures当前的缓冲文件A并创build一个新的临时文件B ,开始向其写入新的传入数据。
- 在临时文件A上打开一个读取stream,并将数据从pipe道传输到目标stream。 你可能不能使用实际的
pipe()
方法,因为当你到达临时文件的末尾时,它会发出数据的结束,这不是我们想要的,因为它不是所有传入数据的实际结束。 ( 看看pipe()
是做什么的 ,然后自己实现,减去调用end()
。) - 当临时文件的streamA发出
end
,删除文件A. 然后回到步骤1,再次用文件B开始该过程。 (如果在此期间没有数据写入文件B ,则返回到非缓冲操作,直接将传入数据写入目标stream。)
一旦服务器发出信号表示已完成发送数据, 并且已经从临时文件中读取了所有数据,则将write(null)
写入目标stream以表示没有更多数据。 全做完了!
通过在临时缓冲区文件之间进行交换并在数据处理完成后将其删除,您不必担心在写入文件时读取数据。 另外,您不必将整个传入的数据stream缓冲在磁盘上。
当然,这确实假设您的存储介质可以保证接受写入,而不是通过networking接收数据。 这可能是安全的,但如果这个假设是不正确的,事情可能会崩溃。 使用生产系统进行testing – 传入数据的峰值速度是多less,您的产品系统上的磁盘写入速度有多快?