如何使用asynchronousI / O将实时数据集写入磁盘?

我是在node.js中开发的新手(尽pipe在客户端JavaScript方面经验相对丰富),并且在处理node.js中的asynchronous操作时遇到了很多关于良好实践的问题。

我的具体问题(虽然我想这是一个相当通用的主题)是,我有一个node.js应用程序(在树莓派上运行),每隔10秒将数个温度探测器的读数logging到内存数据结构中。 这工作得很好。 数据随着时间在内存中积累,随着数据积累并达到特定的大小阈值,数据会定期老化(只保留最后N天的数据)以防止数据增长超过一定的大小。 这个温度数据被用来控制一些其他的电器。

然后,我有一个单独的间隔计时器,每隔一段时间将这些数据写出到磁盘(如果进程崩溃,则保留它)。 我使用asynchronousnode.js( fs.open()fs.write()fs.close() )磁盘IO将数据写入磁盘。

而且,由于磁盘IO的asynchronous特性,在我看来,我试图写入磁盘的数据结构可能会在我写入磁盘的过程中被修改。 这可能是一件坏事。 如果在将数据写入磁盘时数据只附加到数据结构中,那么实际上并不会导致写入数据的方式出现问题,但是在某些情况下,在logging新数据时可能会修改较早的数据这真的会弄乱我在写入磁盘过程中的完整性。

我可以想到我可以在我的代码中使用各种有点丑陋的保护措施,例如:

  1. 切换到同步IO以将数据写入磁盘(对于服务器响应的原因,实际上并不想这么做)。
  2. 设置一个标志,当我开始写数据,并没有logging任何新的数据,而该标志设置(导致我在写入过程中丢失的数据logging)。
  3. 选项2更复杂的版本,我设置标志和设置标志时,新的数据进入一个单独的临时数据结构,当文件IO完成,然后合并真实的数据(可行,但似乎丑陋)。
  4. 拍摄原始数据的快照副本,并花时间将该副本写入磁盘,知道没有人会修改副本。 我不想这样做,因为数据集是相当大的,我在有限的内存环境(树莓派)。

所以,我的问题是当asynchronousIO的其他操作可能需要修改该数据时用什么devise模式来编写一个大的数据集? 处理我的问题的方法是否比上面列出的具体解决方法更多?

你的问题是数据同步 。 传统上这是用锁/互斥锁来解决的,但是javascript / node并没有像内build的那样。

那么,我们如何在节点中解决这个问题呢? 我们使用队列。 就个人而言,我使用asynchronous模块的队列function。

一旦上一个任务完成(类似于你的选项3),队列的工作方式是保留需要执行的任务列表,并且只执行这些任务,按照它们被添加到队列中的顺序执行。

队列动画

注意:asynchronous模块的队列方法实际上可以同时运行多个任务(如上面显示的animation),但是由于我们在这里讲的是数据同步,所以我们不需要这样做。 幸运的是,我们可以告诉它一次只运行一个。

在你的特定情况下,你想要做的是设置一个队列,可以做两种types的任务:

  1. 修改你的数据结构
  2. 将您的数据结构写入磁盘

无论何时从温度探测器获取新数据,都可以将任务添加到队列中,以便用新数据修改数据结构。 然后,每当间隔计时器触发时,将任务添加到将数据结构写入磁盘的队列中。

由于队列一次只能运行一个任务,按顺序将它们添加到队列中,因此保证在将数据写入磁盘时,永远不会修改内存中的数据结构。

一个非常简单的实现可能如下所示:

 var dataQueue = async.queue(function(task, callback) { if (task.type === "newData") { memoryStore.add(task.data); // modify your data structure however you do it now callback(); // let the queue know the task is done; you can pass an error here as usual if needed } else if (task.type === "writeData") { fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) { // error handling callback(err); // let the queue know the task is done }) } else { callback(new Error("Unknown Task")); // just in case we get a task we don't know about } }, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time // call when you get new probe data funcion addNewData(data) { dataQueue.push({task: "newData", data: data}, function(err) { // called when the task is complete; optional }); } // write to disk every 5 minutes setInterval(function() { dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) { // called when the task is complete; optional }); }, 18000); 

另外请注意,您现在可以asynchronous地将数据添加到您的数据结构中。 假设您添加了一个新的探测器,只要其值发生变化就会触发一个事件。 您可以像添加现有的探针一样添加addNewData(data) ,而不用担心与正在进行的修改或磁盘写入冲突(如果您开始写入数据库而不是内存数据存储)。


更新:使用bind()更优雅的实现

这个想法是,您使用bind()将参数绑定到函数,然后将bind()返回的新绑定函数推送到队列中。 这样你就不需要将一些自定义对象推到它必须解释的队列中去; 你可以给它一个函数来调用,所有的设置已经有了正确的参数。 唯一需要注意的是函数必须将callback作为最后一个参数。

这应该允许您使用所有现有的function(可能稍作修改),只需将它们推送到队列中,以确保它们不会同时运行。

我一起扔了这个testing的概念:

 var async = require('async'); var dataQueue = async.queue(function(task, callback) { // task is just a function that takes a callback; call it task(callback); }, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time function storeData(data, callback) { setTimeout(function() { // simulate async op console.log('store', data); callback(); // let the queue know the task is done }, 50); } function writeToDisk(filename, callback) { setTimeout(function() { // simulate async op console.log('write', filename); callback(); // let the queue know the task is done }, 250); } // store data every second setInterval(function() { var data = {date: Date.now()} var boundStoreData = storeData.bind(null, data); dataQueue.push(boundStoreData, function(err) { console.log('store complete', data.date); }) }, 1000) // write to disk every 2 seconds setInterval(function() { var filename = Date.now() + ".dat" var boundWriteToDisk = writeToDisk.bind(null, filename); dataQueue.push(boundWriteToDisk, function(err) { console.log('write complete', filename); }); }, 2000); 

首先 – 让我们展示一个实用的解决scheme,然后让我们深入了解它的工作原理和原理:

 var chain = Promise.resolve(); // Create a resolved promise var fs = Promise.promisifyAll(require("fs")); chain = chain.then(function(){ return fs.writeAsync(...); // A }); // some time in the future chain = chain.then(function(){ return fs.writeAsync(...); // This will always execute after A is done }) 

既然你已经用承诺标记了你的问题,值得一提的是承诺很好地解决了这个(相当复杂的)问题,而且很容易做到。

你的数据同步问题被称为生产者消费者问题。 有很多方法可以解决JavaScript中的同步问题–Q的KrisKowal最近的一篇文章是关于这个主题的一个很好的解读。

input:承诺

以承诺解决问题的最简单的方法是通过一个单一的承诺来链接所有的东西。 我知道你自己承诺的经验,但对于较新的读者让我们回顾一下:

承诺是自我sorting概念的抽象 。 承诺是一个单独的(阅读离散)行动单位。 链接诺言,很像; 在某些语言中,注意到一个操作的结束和下一个操作的开始。 JavaScript中的承诺抽象出两个主要的东西 – 行为花费时间和特殊条件的概念。

在这里有一个更高的抽象,叫做monad ,而A + promise并不严格遵守monad的规则(为了方便起见)有承诺的实现。 诺言抽象了一些处理,monads抽象处理自己的概念,你可以说承诺是一个monad,或者至less是monadic

承诺以待决意义开始,代表已经开始但尚未完成的行动。 在某些时候,他们可能会通过解决办法解决两个国家之一:

  • 已完成 – 表示该操作已成功完成。
  • 已拒绝 – 表示该操作未成功完成。

承诺一旦解决,就不能再改变自己的状态。 就像你可以继续一样; 在下一行 – 你可以继续使用.then关键字将前一个动作链接到下一个动作。

解决生产者 – 消费者。

生产者/消费者问题的传统解决scheme可以通过像Dijkstra的信号量这样的传统并发结构来实现。 事实上,这样的解决scheme是通过承诺或简单的callback来实现的,但我相信我们可以做类似的事情。

相反,我们将继续运行一个程序,并每次添加新的操作。

 var fsQueue = Promise.resolve(); // start a new chain // one place fsQueue = fsQueue.then(function(){ // assuming promisified fs here return fs.writeAsync(...); }); // some other place fsQueue = fsQueue.then(function(){ return fs.writeAsync(...); }); 

向队列添加动作可以确保我们已经命令了同步,动作只会在早期完成之后才会执行。 这是解决这个问题的最简单的同步scheme,需要用fs.asyncFunction调用.then然后将它们放到队列中。

另一种解决scheme是使用类似于“监视器”的东西 – 我们可以通过封装fs来确保访问是一致的:

 var fs = B.promisifyAll(require("fs")); // bluebird promisified fs var syncFs = { // sync stands for synchronized, not synchronous queue: B.resolve(); writeAsync = function(){ var args = arguments return (queue = queue.then( // only execute later return fs.writeAsync.apply(fs,arguments); }); } // promisify other used functions similarly }; 

哪个会产生fs动作的同步版本。 也可以使用类似的东西自动化(未testing):

 // assumes module is promisified and ignores nested functions function synchronize(module){ var ret = {}, queue = B.resolve(); for(var fn in module){ ret[fn] = function(){ var args = arguments; queue = queue.then(function(){ return module[fn].apply(module, args); }) }; } ret.queue = queue; // expose the queue for handling errors return ret; } 

哪个应该产生一个同步所有动作的模块版本。 请注意,我们获得了额外的好处,即错误不会被抑制,文件系统也不会处于不一致的状态,因为在导致不执行操作的错误得到处理之前,操作不会被执行。

这不是和队列类似吗?

是! 队列做了一些非常相似的事情(你可以在其他答案中看到)提供先进先出的结构。 很像程序代码,按照该顺序执行。 在我看来,承诺只是同一枚硬币的强大一面。

另一个答案也通过队列提供了一个可行的select。

关于你build议的方法

切换到同步IO以将数据写入磁盘(对于服务器响应的原因,实际上并不想这么做)。

虽然我同意这是最简单的 – “监视”方法链接所有需要在同一队列上同步的操作非常相似。

设置一个标志,当我开始写数据,并没有logging任何新的数据,而该标志设置(导致我在写入过程中丢失的数据logging)。

该标志实际上是一个互斥体。 如果你阻止(或放弃行动),当有人重试,你有一个真正的互斥体,保持“互斥保证”。

使用该标志重试,并保持下一个动作的列表来保存该标志实际上在信号量的实现中是非常常见的 – 一个例子是在linux内核中。

选项2更复杂的版本,我设置标志和设置标志时,新的数据进入一个单独的临时数据结构,当文件IO完成,然后合并真实的数据(可行,但似乎丑陋)。 拍摄原始数据的快照副本,并花时间将该副本写入磁盘,知道没有人会修改副本。 我不想这样做,因为数据集是相当大的,我在有限的内存环境(树莓派)。

这些方法通常被称为事务RCU更新,在某些情况下,它们实际上是非常现代化和非常快速的,例如“读者作者问题”(这与您所拥有的非常相似)。 原生支持这些在Linux内核最近踢了。 在某些情况下这样做实际上是可行和高性能的,虽然在你的情况下,有点像你所build议的那样过分复杂。

所以,总结一下

  • 这不是一个简单的问题,而是一个有趣的问题。
  • 幸运的是,诺言很好地解决了它们,它们的build立正是为了通过抽象序列的概念来解决这类问题。

快乐的编码,Pi NodeJS项目听起来很棒。 让我知道如果我能进一步澄清这一点。