如何asynchronous写入和读取节点中的同一个文件?

我有数据通过websocket进来。 它发送20ms的二进制数据块。 我需要连接每个块,以便后端进程可以读取连续stream的数据。

//Create the file and append binary as it comes in tmp.file({postfix: '.raw' },function (err, path, fd, cleanup) { if (err) throw err; newPath = path fs.appendFile(newPath, new Buffer(binary), (err) => { if (err) throw err; }) }) //Read the file as it is written fs.createReadStream(newPath).pipe(recStream); 

现在,我只是在createReadStream上有一个简单的半秒延迟来确保文件中有数据。

这当然不正确,不工作。 什么是正确的方式去呢?

在这种情况下,最好的办法是告诉服务器,你正在接收数据,暂停,直到你准备好处理更多( drain )。 假设这不是你的select:

首先将传入数据写入目标stream。 如果write(chunk)返回false ,则表示stream的内部缓冲区已满; 是时候开始将后续数据caching到磁盘了。 (刚刚写入的数据chunk会导致返回值false ,请不要将其写入磁盘 – false并不表示写入失败,这只是缓冲区比highWaterMark数据更多的信号。

在一个临时文件夹中,创build一个新文件( A )写入stream,并向其写入下一个传入数据块。 做到这一点,直到您的目的地stream发出drain事件。

当你的目的地drain

  1. 交换缓冲区文件。 closures当前的缓冲文件A并创build一个新的临时文件B ,开始向其写入新的传入数据。
  2. 在临时文件A上打开一个读取stream,并将数据从pipe道传输到目标stream。 你可能不能使用实际的pipe()方法,因为当你到达临时文件的末尾时,它会发出数据的结束,这不是我们想要的,因为它不是所有传入数据的实际结束。 ( 看看pipe()是做什么的 ,然后自己实现,减去调用end() 。)
  3. 当临时文件的streamA发出end ,删除文件A. 然后回到步骤1,再次用文件B开始该过程。 (如果在此期间没有数据写入文件B ,则返回到非缓冲操作,直接将传入数据写入目标stream。)

一旦服务器发出信号表示已完成发送数据, 并且已经从临时文件中读取了所有数据,则将write(null)写入目标stream以表示没有更多数据。 全做完了!

通过在临时缓冲区文件之间进行交换并在数据处理完成后将其删除,您不必担心在写入文件时读取数据。 另外,您不必将整个传入的数据stream缓冲在磁盘上。

当然,这确实假设您的存储介质可以保证接受写入,而不是通过networking接收数据。 这可能是安全的,但如果这个假设是不正确的,事情可能会崩溃。 使用生产系统进行testing – 传入数据的峰值速度是多less,您的产品系统上的磁盘写入速度有多快?