How to stream MongoDB Query Results with nodejs?
node-mongodb-driver
(the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.
Update: there are docs.
It can be used like this:
var stream = collection.find().stream()
stream.on('error', function (err) {
console.error(err)
})
stream.on('data', function (doc) {
console.log(doc)
})
It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)
Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)
var sys = require('sys')
var http = require("http");
var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;
var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));
var products;
db.open(function (error, client) {
if (error) throw error;
products = new Collection(client, 'products');
});
function ProductReader(collection) {
this.collection = collection;
}
ProductReader.prototype = new process.EventEmitter();
ProductReader.prototype.do = function() {
var self = this;
this.collection.find(function(err, cursor) {
if (err) {
self.emit('e1');
return;
}
sys.puts("Printing docs from Cursor Each");
self.emit('start');
cursor.each(function(err, doc) {
if (!err) {
self.emit('e2');
self.emit('end');
return;
}
if(doc != null) {
sys.puts("doc:" + doc.name);
self.emit('doc',doc);
} else {
self.emit('end');
}
})
});
};
http.createServer(function(req,res){
pr = new ProductReader(products);
pr.on('e1',function(){
sys.puts("E1");
res.writeHead(400,{"Content-Type": "text/plain"});
res.write("e1 occurred\n");
res.end();
});
pr.on('e2',function(){
sys.puts("E2");
res.write("ERROR\n");
});
pr.on('start',function(){
sys.puts("START");
res.writeHead(200,{"Content-Type": "text/plain"});
res.write("<products>\n");
});
pr.on('doc',function(doc){
sys.puts("A DOCUMENT" + doc.name);
res.write("<product><name>" + doc.name + "</name></product>\n");
});
pr.on('end',function(){
sys.puts("END");
res.write("</products>");
res.end();
});
pr.do();
}).listen(8000);
Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:
Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
More elaborated examples can be found on their documentation page.
mongoose
is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native
).
To do what you're doing, take a look at the driver's .find
and .each
method. Here's some code from the examples:
// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
sys.puts("Printing docs from Cursor Each")
cursor.each(function(err, doc) {
if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
})
});
To stream the results, you're basically replacing that sys.puts
with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush()
, but you may also want to checkout socket.io
.