插入stream式XML数据库

我试图有效地插入大量的数据(XML文件大小超过70GB),而不会崩溃我的MongoDB服务器。 目前这就是我在NodeJS中使用xml-stream所做的:

 var fs = require('fs'), path = require('path'), XmlStream = require('xml-stream'), MongoClient = require('mongodb').MongoClient, assert = require('assert'), ObjectId = require('mongodb').ObjectID, url = 'mongodb://username:password@my.server:27017/mydatabase', amount = 0; var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); var xml = new XmlStream(stream); xml.collect('ns:Statistik'); xml.on('endElement: ns:Statistik', function(item) { var insertDocument = function(db, callback) { db.collection('vehicles').insertOne(item, function(err, result) { amount++; if (amount % 1000 == 0) { console.log("Inserted", amount); } callback(); }); }; MongoClient.connect(url, function(err, db) { insertDocument(db, function() { db.close(); }); }); }); 

当我调用xml.on()它基本上返回当前所在的树/元素。 由于这是JSON的直线上升,我可以把它给我的db.collection().insertOne()函数作为参数,它将它插入数据库完全是我想要的。

所有的代码实际上都像现在这样工作,但是在大约3000次插入之后停止(大约需要10秒)。 我怀疑这是因为我打开一个数据库连接,插入数据,然后每次在XML文件中看到一棵树时closures连接,在这种情况下大约有3000次。

我可以,不知何故,合并insertMany()函数,并以100s(或更多)的块来做,但我不太确定如何将它与所有的stream和asynchronous。

所以我的问题是:如何插入大量的XML(到JSON)到我的MongoDB数据库,而不会崩溃?

你有权假定.insertMany()会比每一次写入都好,所以它实际上只是收集“stream”上的数据。

由于执行是“asynchronous”的,因此通常要避免在堆栈中存在太多活动调用,所以通常在调用.insertMany().resume()一次callback之前调用“stream”完成:

 var fs = require('fs'), path = require('path'), XmlStream = require('xml-stream'), MongoClient = require('mongodb').MongoClient, url = 'mongodb://username:password@my.server:27017/mydatabase', amount = 0; MongoClient.connect(url, function(err, db) { var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); var xml = new XmlStream(stream); var docs = []; //xml.collect('ns:Statistik'); // This is your event for the element matches xml.on('endElement: ns:Statistik', function(item) { docs.push(item); // collect to array for insertMany amount++; if ( amount % 1000 === 0 ) { xml.pause(); // pause the stream events db.collection('vehicles').insertMany(docs, function(err, result) { if (err) throw err; docs = []; // clear the array xml.resume(); // resume the stream events }); } }); // End stream handler - insert remaining and close connection xml.on("end",function() { if ( amount % 1000 !== 0 ) { db.collection('vehicles').insertMany(docs, function(err, result) { if (err) throw err; db.close(); }); } else { db.close(); } }); }); 

甚至有些现代化:

 const fs = require('fs'), path = require('path'), XmlStream = require('xml-stream'), MongoClient = require('mongodb').MongoClient; const uri = 'mongodb://username:password@my.server:27017/mydatabase'; (async function() { let amount = 0, docs = [], db; try { db = await MongoClient.connect(uri); const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')), xml = new XmlStream(stream); await Promise((resolve,reject) => { xml.on('endElement: ns:Statistik', async (item) => { docs.push(item); amount++; if ( amount % 1000 === 0 ) { try { xml.pause(); await db.collection('vehicle').insertMany(docs); docs = []; xml.resume(); } catch(e) { reject(e) } } }); xml.on('end',resolve); xml.on('error',reject); }); if ( amount % 1000 !== 0 ) { await db.collection('vehicle').insertMany(docs); } } catch(e) { console.error(e); } finally { db.close(); } })(); 

请注意, MongoClient连接实际上包装了所有其他操作。 您只想连接一次 ,其他操作发生在“stream”的事件处理程序上。

因此,对于您的XMLStream ,事件处理程序在expression式匹配时触发,并将数据提取并收集到数组中。 调用.insertMany()调用的每1000个项目.insertMany()插入文档,在“asynchronous”调用上“暂停”和“恢复”。

一旦完成,“结束”事件在“stream”上被触发。 这是closures数据库连接的地方,事件循环将被释放并结束程序。

虽然可以通过允许各种.insertMany()调用一次发生(通常为“合并大小”以避免溢出调用堆栈)来获得某种程度的“并行性”,但这基本上是该进程的外观最简单的forms就是在等待其他asynchronousI / O完成时简单地暂停。

注意 :根据后续问题,从原始代码.collect()方法,这看起来并不是必须的,实际上是在每次写入数据库之后都保留内存中真正应该丢弃的节点。