在nodejs中的Mongodb Tailable Cursor,如何停止stream

我使用下面的代码从mongodb capped集合中获取数据

函数listen(条件,callback){

db.openConnectionsNew( [req.session.client_config.db] , function(err, conn){ if(err) {console.log({err:err}); return next(err);} coll = db.opened[db_name].collection('messages'); latestCursor = coll.find(conditions).sort({$natural: -1}).limit(1) latestCursor.nextObject(function(err, latest) { if (latest) { conditions._id = {$gt: latest._id} } options = { tailable: true, awaitdata: true, numberOfRetries: -1 } stream = coll.find(conditions, options).sort({$natural: -1}).stream() stream.on('data', callback) }); }); } 

然后我使用sockets.broadcast(roomName,'data',document);

在客户端

 io.socket.get('/get_messages/', function(resp){ }); io.socket.on('data', function notificationReceivedFromServer ( data ) { console.log(data); 

});

这工作完美,因为我能够看到任何插入数据库的新文件。

我可以在mongod -verbose中看到,在每个200ms后查询运行查询{$ gt:latest_id},这是好的,但我不知道如何closures此查询:(我是非常新的nodejs和使用mongodb tailable选项第一次,完全失去了,任何帮助或线索高度赞赏

.stream()返回的Cursor对象的.stream()方法返回的是一个节点转换stream接口的实现。 具体来说,这是一个“可读”的stream。

因此,当数据stream中有新的数据被接收和读取时,就会发出“数据”事件。

还有其他的方法,例如.pause().resume() ,可以用来控制这些事件的stream程。 通常情况下,您可以在“数据”事件callback中调用这些“内部”事件,以确保在处理“下一个”数据事件之前执行该callback中的代码:

 stream.on("data", function(data) { // pause before processing stream.pause(); // do some work, possibly with a callback something(function(err,result) { // Then resume when done stream.resume(); }); }); 

但是,这当然只是“范围”的问题。 所以只要“stream”variables被定义在另一段代码可以访问的范围内,那么你可以随时调用任一方法。

同样,出于同样的范围界定,您可以在代码中的任何位置“取消”取消定义“stream”对象,从而使“事件处理”变得多余。

 // Just overwrite the object scope = undefined; 

所以值得了解。 实际上,节点驱动程序的较新版本“2.x”将一个“stream接口”直接包装到标准Cursor对象中,而不需要调用.stream()进行转换。 节点stream是非常有用和强大的事情,这将是非常值得,而他们的使用条款。