如何检测一连串的相关事件完成,然后发出一个事件

我的第一个问题! 我真的很想问这个问题,所以请帮助我改进,如果我能更好地问。

这是我发现的似乎是远程相关的唯一问题,但我无法弄清楚如何将其与我正在尝试做的事情联系起来(他们的问题是特定于JQuery的;我的是Node.JS specific-ish [虽然我发现一个浏览器版本的EventEmitter,并且能够在浏览器中进行testing]): 使用jQuery为每个事件突发运行一次


问题

我有一个过程,我知道会在一段时间内发出一连串的事件。

为了模拟这个过程,我编写了这个代码:

/*******************************************************/ /* This part taken directly from */ /* https://nodejs.org/api/events.html */ /* (with addition of "burstID") */ /* */ /* */ /* */ const EventEmitter = require('events'); /* */ /* */ /* */ /* */ class MyEmitter extends EventEmitter {} /* */ /* */ /* */ /* */ const myEmitter = new MyEmitter(); /* */ /* */ myEmitter.on('event', (burstID) => { /* */ /* */ console.log('an event occurred!', burstID); /* */ /* */ }); /* */ /* */ /* */ /*******************************************************/ const millisecondsToSustainBurst = 3000 ; const millisecondsBetweenPossibleEventEmissions = 200 ; const millisecondsUntilStartNextBurst = 5000 ; const millisecondsUntilNoMoreBursts = 23000 ; const now = new Date() ; console.log('Time now: ' + now + '; should run until about ' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ; const doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => { if (Math.random() > 0.5) myEmitter.emit('event', burstID) ; if ( !((new Date()) - startTimestamp > millisecondsToSustainBurst) ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ; } const doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => { if ( !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts) ) { const now = new Date() ; console.log('Time now: ' + now + '; starting random-event burst which will run for ' + (millisecondsToSustainBurst/1000) + ' seconds. ') ; setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ; doRandomEmitBurst(new Date(), millisecondsToSustainBurst, 'burstThatStartedAt' + now.getHours() + 'h' + now.getMinutes() + 'm' + now.getSeconds() + 's') ; } else callback() ; } doRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log('Done at ' + (new Date()))) ; const myBurstDetectedEmitter = new MyEmitter() ; // NOW, what do I do HERE to say: // I've seen a number of events occur in a 5-second period // Now they've stopped // Therefore I'm going to emit a different kind of event 

现在,假设我想要听取这些突发事件的发生。

在采取进一步行动之前,我想确保爆发已经结束。

我该怎么做?


到目前为止,我已经尝试过

首先,我可以创build一个全局的“var”(yuck,我想避免mutable),像这样:

 var timeLastUpdated = {} ; 

…接着…

 function keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) { const timestampToCheckInOneSecond = (typeof timeLastUpdated[keyForUpdateCheck] !== 'undefined' ? timeLastUpdated[keyForUpdateCheck] : (new Date())) ; setTimeout(() => { console.log( 'checking if modifications to "' + keyForUpdateCheck + '" have occurred since ' + timestampToCheckInOneSecond ) ; if (timeLastUpdated[keyForUpdateCheck] === timestampToCheckInOneSecond) { delete timeLastUpdated[keyForUpdateCheck] ; callback() ; } else keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) ; }, 5000) ; } const makeNotificationHandler = () => (keyForUpdateCheck) => { const timeNow = new Date() ; if (typeof timeLastUpdated[keyForUpdateCheck] === 'undefined') keepCheckingTimeLastUpdated(keyForUpdateCheck, () => console.log(keyForUpdateCheck + ' changed')) ; timeLastUpdated[keyForUpdateCheck] = timeNow ; } ; myEmitter.on('event', makeNotificationHandler()) ; 

这只是这似乎是一个反模式(我希望我使用这个词是正确的)。 我的直觉说,拥有一个全局对象是这里的错误方法,而且还有一个更类似于函数式编程的解决scheme。


只为这些人感兴趣:

(为了回答问题,随意忽略)

增加了复杂性:在我的示例代码中,“burstID”将永远不会相同,但在现实世界的例子中,可能是这样。 我想等到自上次“burstID”出现以来经过了一段时间,才能确定是否真正完成了突发变化。

对于上下文,在实际应用程序中,我使用node-postgres在PostGres数据库上设置了“LISTEN”。 “burstID”是一个表中的主键,也可以用作多个其他表中的外键。 我正在监听所有使用共享密钥的表,并且我得到的信息包含此密钥。

回答我自己的问题,在上面的意见的帮助下。

十分感谢你!” 感谢Scott Sauyet的帮助和鼓励。


我决定创build一个我称之为“Accumulomator”的东西:一个自动累加器。 邮件被发送到这些Accumulomators,我认为他们是一个仓库里的人和一个秒表。

一旦Accumulomator被实例化,它就开始查看它的秒表。 每当秒expression到最后时,Accumulomator会查看是否与上次相同。 如果它和上次一样,Accumulomator把它的信息包打包,交给仓库,然后在晴朗的地方退休。

全面更新的代码如下。 我欢迎任何编辑来改进代码。


 // To test in a browser, use: // https://github.com/Olical/EventEmitter // …in place of const EventEmitter = require('events'); /*******************************************************/ /* This part taken directly from */ /* https://nodejs.org/api/events.html */ /* (with addition of "burstID") */ /* */ /* */ /* */ const EventEmitter = require('events'); /* */ /* */ /* */ /* */ class MyEmitter extends EventEmitter {} /* */ /* */ /* */ /* */ const myEmitter = new MyEmitter(); /* */ /* */ myEmitter.on('event', (burstID) => { /* */ /* console.log('an event occurred!', burstID); */ /* */ }); /* */ /* */ /* */ /*******************************************************/ const millisecondsToSustainBurst = 3000 ; const millisecondsBetweenPossibleEventEmissions = 200 ; const millisecondsUntilStartNextBurst = 5000 ; const millisecondsUntilNoMoreBursts = 23000 ; const now = new Date() ; console.log('Bursts starting. Time now: ' + now + '; should run until about ' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ; const doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => { if (Math.random() > 0.5) myEmitter.emit('event', burstID) ; if ( !((new Date()) - startTimestamp > millisecondsToSustainBurst) ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ; } const doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => { if ( !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts) ) { const now = new Date() ; console.log('Time now: ' + now + '; starting random-event burst which will run for ' + (millisecondsToSustainBurst/1000) + ' seconds. ') ; setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ; doRandomEmitBurst(new Date(), millisecondsToSustainBurst, 'burstThatStartedAt' + now.getHours() + 'h' + now.getMinutes() + 'm' + now.getSeconds() + 's') ; } else callback() ; } doRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log('Done at ' + (new Date()))) ; const makeAccumulomator = (config) => { if (typeof config !== 'object') throw new Error('Must specify configuration object.') ; if (typeof config.callback !== 'function') throw new Error('Must specify callback function for when the end of new messages is reached.') ; if (typeof config.millisecondsBetweenChecks !== 'number') throw new Error('Must specify milliseconds between checks.') ; if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error('Must specify milliseconds between checks as an integer.') ; if (typeof config.onStop !== 'function' && typeof config.onStop !== 'undefined') throw new Error('If defined at all, onStop must be a function.') ; const accumulomator = {} ; var accumulatedMessages = [] ; var stop = false ; const keepCheckingTimeLastUpdated = (callback) => { const timestampToCheckInOneSecond = (accumulatedMessages.length > 0 ? accumulatedMessages[accumulatedMessages.length - 1].timestamp : (new Date())) ; setTimeout(() => { if (stop) { if (typeof config.onStop === 'function') config.onStop() ; } else if (accumulatedMessages.length < 1) keepCheckingTimeLastUpdated(callback) ; else if (accumulatedMessages[accumulatedMessages.length - 1].timestamp === timestampToCheckInOneSecond) { stop = true ; callback() ; } else keepCheckingTimeLastUpdated(callback) ; }, config.millisecondsBetweenChecks) ; } ; keepCheckingTimeLastUpdated(config.callback) ; accumulomator.receiveMessage = (message) => accumulatedMessages.push({ message: message, timestamp: (new Date())}) ; accumulomator.stopOnNextCheck = () => { if (stop === true) throw new Error('Accumulomator is already stopped.') ; else stop = true ; } accumulomator.isActive = () => stop === false ; accumulomator.getAccumulatedMessages = () => accumulatedMessages ; return accumulomator ; } const makeAccumulomatorWarehouse = (config) => { if (typeof config !== 'object') throw new Error('Must specify configuration object.') ; if (typeof config.callback !== 'function') throw new Error('Must specify callback function.') ; if (typeof config.millisecondsBetweenChecks !== 'number') throw new Error('Must specify milliseconds between checks.') ; if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error('Must specify milliseconds between checks as an integer.') ; if (typeof config.messageRouter !== 'function') throw new Error('Must specify message router function.') ; if (typeof config.sendCallbackAccumulatedMessages !== 'undefined') if (typeof config.sendCallbackAccumulatedMessages !== 'boolean') throw new Error('Must specify whether or not to send callback accumulated messages as a boolean (if unspecified, accumulated messages will not be included).') ; if (typeof config.onAccumulomatorStop !== 'function' && typeof config.onAccumulomatorStop !== 'undefined') throw new Error('If defined at all, onAccumulomatorStop must be a function.') ; var sendCallbackAccumulatedMessages = false ; if (typeof config.sendCallbackAccumulatedMessages !== 'undefined') sendCallbackAccumulatedMessages = config.sendCallbackAccumulatedMessages ; const accumulomatorWarehouse = {} ; const accumulomators = {} ; var warehouseIsShuttingDown = false ; accumulomatorWarehouse.receiveMessage = (message) => { accumulomatorName = config.messageRouter(message) ; if (typeof accumulomatorName !== 'string') throw new Error('The value returned from messageRouter must be a string with a unique identifier.') ; if (typeof accumulomators[accumulomatorName] === 'object') if (accumulomators[accumulomatorName].isActive() === false) delete accumulomators[accumulomatorName] ; if (typeof accumulomators[accumulomatorName] === 'undefined') { if (warehouseIsShuttingDown === false) accumulomators[accumulomatorName] = makeAccumulomator({ callback: () => config.callback( (() => { objectToReturn = {} ; objectToReturn.key = accumulomatorName ; if (sendCallbackAccumulatedMessages === true) objectToReturn.messages = accumulomators[accumulomatorName].getAccumulatedMessages() ; return objectToReturn ; })() ) , millisecondsBetweenChecks: config.millisecondsBetweenChecks , onStop: () => config.onAccumulomatorStop(accumulomatorName) }) ; } if (typeof accumulomators[accumulomatorName] === 'object') accumulomators[accumulomatorName].receiveMessage(message) ; } periodicallyRetireAccumulomators = () => { Object.keys(accumulomators).forEach((accumulomator) => { if (accumulomators[accumulomator].isActive() === false) delete accumulomators[accumulomator] ; }) ; if (!(warehouseIsShuttingDown === true && Object.keys(accumulomators).length === 0)) setTimeout(periodicallyRetireAccumulomators, 10000) ; } ; periodicallyRetireAccumulomators() ; accumulomatorWarehouse.shutDownWarehouse = () => { Object.keys(accumulomators).forEach((accumulomator) => { if (accumulomators[accumulomator].isActive() === true) accumulomators[accumulomator].stopOnNextCheck() ; }) ; warehouseIsShuttingDown = true ; } return accumulomatorWarehouse ; } myAccumulomatorWarehouse = makeAccumulomatorWarehouse({ callback: (accumulomatorWarehousePackage) => console.log('Done with accumulomator.', accumulomatorWarehousePackage.key, accumulomatorWarehousePackage.messages) , millisecondsBetweenChecks: 2000 , messageRouter: (message) => message , sendCallbackAccumulatedMessages: true , onAccumulomatorStop: (accumulomatorName) => console.log('Accumulomator for ' + accumulomatorName + ' manually stopped') }) ; myEmitter.on('event', myAccumulomatorWarehouse.receiveMessage) ; setTimeout(myAccumulomatorWarehouse.shutDownWarehouse, 10000) ;