如何使用 nodejs 流式传输 MongoDB 查询结果?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/7372626/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to stream MongoDB Query Results with nodejs?
提问by Jan Algermissen
I have been searching for an example of how I can stream the result of a MongoDB query to a nodejs client. All solutions I have found so far seem to read the query result at once and then send the result back to the server.
我一直在寻找一个示例,说明如何将 MongoDB 查询的结果流式传输到 nodejs 客户端。到目前为止,我发现的所有解决方案似乎都是立即读取查询结果,然后将结果发送回服务器。
Instead, I would (obviously) like to supply a callback to the query method and have MongoDB call that when the next chunk of the result set is available.
相反,我(显然)希望为查询方法提供回调,并让 MongoDB 在下一个结果集块可用时调用它。
I have been looking at mongoose - should I probably use a different driver?
我一直在看猫鼬 - 我应该使用不同的驱动程序吗?
Jan
简
采纳答案by nab
Streaming in Mongoose became available in version 2.4.0 which appeared three monthsafter you've posted this question:
Mongoose 中的流式传输在您发布此问题三个月后出现的 2.4.0 版中可用:
Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
More elaborated examples can be found on their documentation page.
更详细的示例可以在他们的文档页面上找到。
回答by Dan Milon
node-mongodb-driver(the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.
node-mongodb-driver(每个 mongoDB 客户端在 nodejs 中使用的底层),除了其他人提到的游标 API 有一个很好的流 API(#458)。不幸的是,我没有在其他地方找到它的记录。
Update: there are docsalso here.
It can be used like this:
它可以像这样使用:
var stream = collection.find().stream()
stream.on('error', function (err) {
console.error(err)
})
stream.on('data', function (doc) {
console.log(doc)
})
It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)
它实际上实现了 ReadableStream 接口,因此它具有所有优点(暂停/恢复等)
回答by Gates VP
mongooseis not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native).
mongoose并不是真正的“驱动程序”,它实际上是 MongoDB 驱动程序 ( node-mongodb-native)周围的 ORM 包装器。
To do what you're doing, take a look at the driver's .findand .eachmethod. Here's some code from the examples:
要执行您正在执行的操作,请查看驱动程序.find和.each方法。以下是示例中的一些代码:
// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
sys.puts("Printing docs from Cursor Each")
cursor.each(function(err, doc) {
if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
})
});
To stream the results, you're basically replacing that sys.putswith your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush(), but you may also want to checkout socket.io.
要流式传输结果,您基本上是sys.puts用“流”函数替换它。不确定您打算如何流式传输结果。我想你可以这样做response.write() + response.flush(),但你可能还想结帐socket.io。
回答by Jan Algermissen
Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)
这是我找到的解决方案(如果这是错误的方法,请纠正我):(也请原谅糟糕的编码 - 现在我来美化这个为时已晚)
var sys = require('sys')
var http = require("http");
var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;
var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));
var products;
db.open(function (error, client) {
if (error) throw error;
products = new Collection(client, 'products');
});
function ProductReader(collection) {
this.collection = collection;
}
ProductReader.prototype = new process.EventEmitter();
ProductReader.prototype.do = function() {
var self = this;
this.collection.find(function(err, cursor) {
if (err) {
self.emit('e1');
return;
}
sys.puts("Printing docs from Cursor Each");
self.emit('start');
cursor.each(function(err, doc) {
if (!err) {
self.emit('e2');
self.emit('end');
return;
}
if(doc != null) {
sys.puts("doc:" + doc.name);
self.emit('doc',doc);
} else {
self.emit('end');
}
})
});
};
http.createServer(function(req,res){
pr = new ProductReader(products);
pr.on('e1',function(){
sys.puts("E1");
res.writeHead(400,{"Content-Type": "text/plain"});
res.write("e1 occurred\n");
res.end();
});
pr.on('e2',function(){
sys.puts("E2");
res.write("ERROR\n");
});
pr.on('start',function(){
sys.puts("START");
res.writeHead(200,{"Content-Type": "text/plain"});
res.write("<products>\n");
});
pr.on('doc',function(doc){
sys.puts("A DOCUMENT" + doc.name);
res.write("<product><name>" + doc.name + "</name></product>\n");
});
pr.on('end',function(){
sys.puts("END");
res.write("</products>");
res.end();
});
pr.do();
}).listen(8000);
回答by postJS
I have been studying mongodb streams myself, while I do not have the entire answer you are looking for, I do have part of it. you can setup a socket.io stream
我自己一直在研究 mongodb 流,虽然我没有你正在寻找的完整答案,但我有一部分。您可以设置 socket.io 流
this is using javascript socket.io and socket.io-streaming available at NPM also mongodb for the database because using a 40 year old database that has issues is incorrect, time to modernize also the 40 year old db is SQL and SQL doesn't do streams to my knowledge
这是使用 javascript socket.io 和 socket.io-streaming 在 NPM 上可用的数据库也是 mongodb 因为使用有问题的 40 年历史的数据库是不正确的,现代化的时间也是 40 年历史的数据库是 SQL 而 SQL 没有根据我的知识做流
So although you only asked about data going from server to client, I also want to get client to server in my answer because I can NEVER find it anywhere when I search and I wanted to setup one place with both the send and receive elements via stream so everyone could get the hang of it quickly.
因此,尽管您只询问了从服务器到客户端的数据,但我还想在我的回答中让客户端到服务器,因为我在搜索时永远无法在任何地方找到它,并且我想通过流设置一个包含发送和接收元素的地方这样每个人都可以快速掌握它。
client side sending data to server via streaming
客户端通过流向服务器发送数据
stream = ss.createStream();
blobstream=ss.createBlobReadStream(data);
blobstream.pipe(stream);
ss(socket).emit('data.stream',stream,{},function(err,successful_db_insert_id){
//if you get back the id it went into the db and everything worked
});
server receiving stream from the client side and then replying when done
服务器从客户端接收流,然后在完成后回复
ss(socket).on('data.stream.out',function(stream,o,c){
buffer=[];
stream.on('data',function(chunk){buffer.push(chunk);});
stream.on('end',function(){
buffer=Buffer.concat(buffer);
db.insert(buffer,function(err,res){
res=insertedId[0];
c(null,res);
});
});
});
//This is the other half of that the fetching of data and streaming to the client
//这是获取数据和流式传输到客户端的另一半
client side requesting and receiving stream data from server
客户端从服务器请求和接收流数据
stream=ss.createStream();
binarystring='';
stream.on('data',function(chunk){
for(var I=0;i<chunk.length;i++){
binarystring+=String.fromCharCode(chunk[i]);
}
});
stream.on('end',function(){ data=window.btoa(binarystring); c(null,data); });
ss(socket).emit('data.stream.get,stream,o,c);
server side replying to request for streaming data
服务器端回复流数据请求
ss(socket).on('data.stream.get',function(stream,o,c){
stream.on('end',function(){
c(null,true);
});
db.find().stream().pipe(stream);
});
The very last one there is the only one where I am kind of just pulling it out of my butt because I have not yet tried it, but that should work. I actually do something similar but I write the file to the hard drive then use fs.createReadStream to stream it to the client. So not sure if 100% but from what I read it should be, I'll get back to you once I test it.
最后一个是唯一一个我只是把它从我的屁股里拉出来的,因为我还没有尝试过,但这应该有效。我实际上做了类似的事情,但我将文件写入硬盘驱动器,然后使用 fs.createReadStream 将其流式传输到客户端。所以不确定是否 100% 但从我读到的内容来看,我会在测试后回复你。
P.s. anyone want to bug me about my colloquial way of talking, I'm Canadian, and I love saying "eh" come at me with your hugs and hits bros/sis' :D
Ps 任何人都想就我的口语说话方式来打扰我,我是加拿大人,我喜欢说“嗯”来拥抱我并击中兄弟/姐妹':D

