使用node.js在neo4j中导入CSV

我正在尝试使用node.js将csv文件导入neo4j 。 我必须将数据插入到多个collection/table ,所以我必须使用node.js脚本来插入数据。 但是我的问题是,插入csv数据时无法防止数据重复。

CSV数据示例:

 name ------------- Afghanistan Afghanistan Aland Albania Albania Bangladesh Bangladesh 

index.js

 cp = require('child_process'); child = cp.fork(__dirname + "/background-import-csv-file.js"); child.on('message', function(msg) { console.log("background-insert-process said : ", msg); }); file = path.resolve(__dirname, `./file/simplemaps.csv`); child.send(file); 

background-import-csv-file.js ,我以两种不同的方式编写代码。

基于第一个承诺background-import-csv-file.js ):

 cp = require('child_process'); csv = require('fast-csv'); Q = require('q'); DB = require("./common/driver"); Country = require('./collection/country'); process.on("message", (file) => { stream = fs.createReadStream(file); csv .fromStream(stream, { headers: true }) .on("data", function(data) { let countryData = { "name": data.name }; neo = new DB(); country = new Country(neo); country.insert(countryData) .then(resp => process.send(resp.msg) ) .catch(err => process.send(err) ) }) .on("end", () => process.send("file read complete") ); }); 

./collection/country.js

  Q = require('q'); Country = function Country(neo) { this.country = "Country"; this.neo = neo; }; Country.prototype.find = function find(filters) { query = `MATCH (a:Country { name: '${filters.name}' } ) RETURN {country:properties(a)}`; return this.neo.run(query, filters).then(resp => resp); } Country.prototype.create = function create(data) { query = `CREATE (ax:Country { name: '${data.name}' } ) RETURN ax `; return this.neo.run(query, {}).then(resp => resp[0].properties).catch(err => err) } Country.prototype.insert = function insert(country) { filter = { name: country.name }; return Q(this.find(filter)) .then(resp => resp.length > 0 ? Q.resolve({ msg: `country: [${country.name}] is already exist` }) : Q.resolve(this.create(country)) ) .then(resp => resp) .catch(e => Q.reject(e)); } module.exports = Country; 

./common/driver.js

 neo4j = require('neo4j-driver').v1; function DB() { this.driver = neo4j.driver(); this.session = this.driver.session(); } DB.prototype.run = function run(query, data) { return this.session.run(query, data) .then(response => response.records.map( record => record._fields[0] ? record._fields.length ? record._fields[0] : {} : {} ) ).catch(err => new Error(err) ); } module.exports = DB; 

当我在terminal运行index.js时,我在数据库中有2个Afghanistan ,1个Aland ,2个Albania和2个Bangladesh 。 但我的数据库中需要1个Afghanistan ,1个Aland ,1个Albania和1个Bangladesh 。 当我分析代码时,发现在插入数据之前,如果数据已经存在或者没有存在,我会检查数据( Country.prototype.find = function find(filters) ),但是总是返回空的结果。 那为什么它插入多个数据。 如果我再次运行index.js ,那么没有新的数据被插入到数据库中。 为了解决这个问题,我尝试了下面的CQL

  MERGE (c:Country { name: '${data.name}' } ) RETURN c 

它插入了独特的数据,但它杀了这么多时间。 然后我写了下面的代码:

事件驱动background-import-csv-file.js ):

 process.on("message", (file) => { stream = fs.createReadStream(file); csv .fromStream(stream, { headers: true }) .on("data", function(data) { countryData = { "name": data.name }; neo = new DB(); country = new Country(neo); country.find(countryData); country.on('find', resp => resp.length > 0 ? Q.resolve({ msg: `country: [${country.name}] is already exist` }) : Q.resolve(country.create(countryData)) ); country.on('create', resp => console.log(resp) ); }) .on("end", () => process.send("file read complete") ); }); 

./collection/country.js

  EventEmitter = require('events').EventEmitter; util = require('util'); Country = function Country(neo) { this.neo = neo; EventEmitter.call(this); }; util.inherits(Country, EventEmitter); Country.prototype.find = function find(filters) { query = `MATCH (a:Country { name: '${filters.name}' } ) RETURN {country:properties(a)}`; return this.neo.run(query, {}).then(resp => this.emit('find', resp)); } Country.prototype.create = function create(data) { query = `CREATE (ax:Country { name: '${data.name}' } ) RETURN ax `; return this.neo.run(query, {}).then(resp => this.emit('create', resp[0].properties)).catch(err => err) } 

而这一次,它显示了相同的结果。 我错过了什么? 任何build议将是非常有益的。

注意:我正在使用fast-csv进行csvparsing, Q使用promise。

其实我可以想象下面的解决scheme:

  1. 使用编程语言(如node.js)修改CSV文件本身以删除具有相同名称的重复行。
  2. 添加neo4j 唯一约束 CREATE CONSTRAINT ON (c:Country) ASSERT c.name IS UNIQUE
  3. 涉及中间件,就像一个防止重复项目的队列一样,为此,你需要定义你自己的消息结构和重复的算术。

以上。

我的问题是,在csv文件parsing,它是如此之快(事件驱动),不等待完成插入数据到数据库。 所以我必须暂停文件parsing,然后恢复它。

我使用下面的代码解决了我的问题:

基于Promise(background-import-csv-file.js):

 cp = require('child_process'); csv = require('fast-csv'); Q = require('q'); DB = require("./common/driver"); Country = require('./collection/country'); process.on("message", (file) => { stream = fs.createReadStream(file); csvstream = csv .fromStream(stream, { headers: true }) .on("data", function(data) { csvstream.pause(); // pause the csv file parsing countryData = { "name": data.name }; neo = new DB(); country = new Country(neo); country.insert(countryData) .then(resp => { process.send(resp.msg); neo.close(); return csvstream.resume(); // after completing db process, resume }) .catch(err => { process.send(err); return csvstream.resume(); // if failed, then resume }); }) .on("end", () => process.send("file read complete") ); });