如何阅读与node.js或JavaScript文件的行延迟,而不是在非阻塞行为?

我在node.js中读取一个文件(300,000行)。 我想要批量发送5000行的行到另一个应用程序(Elasticsearch)来存储它们。 所以每当我读完5000行时,我想通过一个API将它们批量发送给Elasticsearch,然后继续阅读文件的其余部分,每批发送5000行。

如果我想要使用java(或任何其他阻塞语言,如C,C ++,python等)来完成这个任务,我会这样做:

int countLines = 0; String bulkString = ""; BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt"))); while ((currentLine = br.readLine()) != null) { countLines++; bulkString += currentLine; if(countLines >= 5000){ //send bulkString to Elasticsearch via APIs countLines = 0; bulkString = ""; } } 

如果我想用node.js做同样的事情,我会做:

 var countLines = 0; var bulkString = ""; var instream = fs.createReadStream('filePath.txt'); var rl = readline.createInterface(instream, outstream); rl.on('line', function(line) { if(countLines >= 5000){ //send bulkString to via APIs client.bulk({ index: 'indexName', type: 'type', body: [bulkString] }, function (error, response) { //task is done }); countLines = 0; bulkString = ""; } } 

node.js 的问题在于它是非阻塞的,因此它在发送下一批行之前不会等待第一个API响应。 我知道这可以算作done.js的一个好处,因为它不会等待I / O,但问题是它将太多的数据发送到Elasticsearch。 因此,Elasticsearch的队列将会变满,并且会抛出exception。

我的问题是,如何让node.js在继续读取下一行之前或在将下一批行发送到Elasticsearch之前等待来自API的响应。

我知道我可以在Elasticsearch中设置一些参数来增加队列的大小,但是我对这个问题阻止了node.js的行为感兴趣。 我熟悉callback的概念,但是我不能想到在这种情况下使用callback的方法,以防止node.js以非阻塞模式调用Elasticsearch API。

皮埃尔的答案是正确的。 我只是想提交一个代码,说明我们如何从node.js的非阻塞概念中受益,但同时也不要一次性用太多的请求压倒Elasticsearch。

这是一个伪代码,您可以通过设置队列大小限制来提高代码的灵活性:

 var countLines = 0; var bulkString = ""; var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server var batchesAlreadyInQueue = 0; var instream = fs.createReadStream('filePath.txt'); var rl = readline.createInterface(instream, outstream); rl.on('line', function(line) { if(countLines >= 5000){ //send bulkString to via APIs client.bulk({ index: 'indexName', type: 'type', body: [bulkString] }, function (error, response) { //task is done batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests rl.resume(); }); if(batchesAlreadyInQueue >= queueSize){ rl.pause(); } countLines = 0; bulkString = ""; } } 

在您的//task is done后,在您的if和rl.resume()之后使用rl.pause()

请注意,调用暂停后,您可能还有更多线路事件。