如何使用nodejsstream式传输MongoDB查询结果?

我一直在寻找一个如何将一个MongoDB查询的结果stream到一个nodejs客户端的例子。 到目前为止,我发现的所有解决scheme似乎都是一次读取查询结果,然后将结果发送回服务器。

相反,我会(显然)喜欢提供一个callback查询方法,并有MongoDB调用时,结果集的下一个块是可用的。

我一直在看mongoose – 我应该使用不同的驱动程序?

一月

在发布此问题三个月之后,在Mongoose中进行stream式处理的版本已经出现在2.4.0版本中:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream); 

更详细的例子可以在他们的文档页面find。

node-mongodb-driver (每个mongoDB客户端在nodejs中使用的基础层)除了其他人提到的游标API有一个漂亮的streamAPI( #458 )。 不幸的是,我没有在其他地方find它。

更新: 这里也有文档 。

它可以像这样使用:

 var stream = collection.find().stream() stream.on('error', function (err) { console.error(err) }) stream.on('data', function (doc) { console.log(doc) }) 

它实际上实现了ReadableStream接口,所以它有所有的好东西(暂停/恢复等)

mongoose不是真正的“驱动程序”,它实际上是一个围绕MongoDB驱动程序( node-mongodb-native )的ORM包装。

要做你正在做的事情,看看驱动程序的.find.each方法。 以下是一些示例代码:

 // Find all records. find() returns a cursor collection.find(function(err, cursor) { sys.puts("Printing docs from Cursor Each") cursor.each(function(err, doc) { if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc)); }) }); 

为了传输结果,你基本上用你的“stream”函数replace了这个sys.puts 。 不知道您计划如何stream式传输结果。 我认为你可以做response.write() + response.flush() ,但你也可能想签出socket.io

这是我find的解决scheme(请纠正我的任何人,如果这是做错的方法):(也请原谅错误的编码 – 现在已经太迟了,以美化这一点)

 var sys = require('sys') var http = require("http"); var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db, Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection, Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection, Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server; var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {})); var products; db.open(function (error, client) { if (error) throw error; products = new Collection(client, 'products'); }); function ProductReader(collection) { this.collection = collection; } ProductReader.prototype = new process.EventEmitter(); ProductReader.prototype.do = function() { var self = this; this.collection.find(function(err, cursor) { if (err) { self.emit('e1'); return; } sys.puts("Printing docs from Cursor Each"); self.emit('start'); cursor.each(function(err, doc) { if (!err) { self.emit('e2'); self.emit('end'); return; } if(doc != null) { sys.puts("doc:" + doc.name); self.emit('doc',doc); } else { self.emit('end'); } }) }); }; http.createServer(function(req,res){ pr = new ProductReader(products); pr.on('e1',function(){ sys.puts("E1"); res.writeHead(400,{"Content-Type": "text/plain"}); res.write("e1 occurred\n"); res.end(); }); pr.on('e2',function(){ sys.puts("E2"); res.write("ERROR\n"); }); pr.on('start',function(){ sys.puts("START"); res.writeHead(200,{"Content-Type": "text/plain"}); res.write("<products>\n"); }); pr.on('doc',function(doc){ sys.puts("A DOCUMENT" + doc.name); res.write("<product><name>" + doc.name + "</name></product>\n"); }); pr.on('end',function(){ sys.puts("END"); res.write("</products>"); res.end(); }); pr.do(); }).listen(8000);