Node.js堆溢出错误

运行此代码1-2天后,我在AWS EC2上收到此错误

错误

<--- Last few GCs ---> st[10805:0x41cdff0] 7130379 ms: Mark-sweep 33.2 (78.7) -> 21.1 (75.8) MB, 13.8 / 0.1 ms (+ 23.1 ms in 23 steps since start of marking, biggest step 4.3 ms, walltime since start of marking 160 ms) final$ <--- JS stacktrace ---> Cannot get stack trace in GC. FATAL ERROR: Scavenger: promoting marked Allocation failed - process out of memory 1: node::Abort() [node] 2: 0x12b288c [node] 3: v8::Utils::ReportOOMFailure(char const*, bool) [node] 4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node] 5: 0xa96bfb [node] 6: void v8::internal::ScavengingVisitor<(v8::internal::MarksHandling)0, (v8::internal::PromotionMode)0, (v8::internal::LoggingAndProfiling)1>::EvacuateObject<(v8::internal::ScavengingVisitor<(v8::intern$ 7: v8::internal::Scavenger::ScavengeObject(v8::internal::HeapObject**, v8::internal::HeapObject*) [node] 8: v8::internal::Heap::IteratePromotedObjectPointers(v8::internal::HeapObject*, unsigned char*, unsigned char*, bool, void (*)(v8::internal::HeapObject**, v8::internal::HeapObject*)) [node] 9: void v8::internal::BodyDescriptorBase::IterateBodyImpl<v8::internal::ObjectVisitor>(v8::internal::HeapObject*, int, int, v8::internal::ObjectVisitor*) [node] 10: void v8::internal::BodyDescriptorApply<v8::internal::CallIterateBody, void, v8::internal::HeapObject*, int, v8::internal::ObjectVisitor*>(v8::internal::InstanceType, v8::internal::HeapObject*, int, v$ 11: v8::internal::Heap::DoScavenge(v8::internal::ObjectVisitor*, unsigned char*, v8::internal::PromotionMode) [node] 12: v8::internal::Heap::Scavenge() [node] 13: v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [node] 14: v8::internal::Heap::CollectGarbage(v8::internal::GarbageCollector, v8::internal::GarbageCollectionReason, char const*, v8::GCCallbackFlags) [node] 15: v8::internal::Factory::NewRawTwoByteString(int, v8::internal::PretenureFlag) [node] 16: v8::internal::Factory::NewStringFromUtf8(v8::internal::Vector<char const>, v8::internal::PretenureFlag) [node] 17: v8::String::NewFromUtf8(v8::Isolate*, char const*, v8::String::NewStringType, int) [node] 18: node::StringBytes::Encode(v8::Isolate*, char const*, unsigned long, node::encoding) [node] 19: void node::Buffer::StringSlice<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&) [node] 20: 0x33c699f18dcf 

我的主要function是一个async while循环,看起来像这样,这是一个快速路线的控制器function

 function controller(cb) { return new Promise((resolve, reject) => { let killed = false; (async() => { let isEmpty = false; while (!killed && !isEmpty) { const code = await processBatch(); if (code === EMPTY_QUEUE) { isEmpty = true; console.log('ss'); resolve(false); } } })(); cb() .then((state) => killed = state); }); } 

在这里, processBatch()可能需要大约10秒来解决承诺

注意: processBatch永远不会返回EMPTY_QUEUE并且死亡从不会被callback设置为true

考虑到这一点,有人可以告诉我为什么这个控制器function在一段时间之后消耗这么多的内存,我正在做一些阻止节点垃圾收集数据或类似的东西?

更新

这是调用控制器function的路由器代码,并确保一次只能有一个控制器工作

 const query = require('../controllers/fetchContent').query; const controller = require('../../storage/controllers/index').controller; let isFetching = false; let killed = false; function killSwitch () { return new Promise((resolve, reject) => { setInterval(() => { if(killed) { resolve(killed); } }, 10000); }) } module.exports = (app) => { app.get('/api', (req, res) => { res.setHeader('Content-Type', 'application/json'); res.json({"statusCode" : 200, "body" : "Hey"}) }); app.post('/', (req, res) => { if(!killed) { if (!isFetching) { isFetching = true; controller(killSwitch) .then((response) => { isFetching = response.isFetching; }); res.send({ success: true, message: 'Okay I will extract send the contents to the database' }) } else { res.send({ success: true, message: 'Already Fetching' }) } } else { res.send({ success: false, message: 'In killed State, start to continue' }) } }); app.post('/kill', (req, res) => { killed = true; isFetching = false; res.send(200, 'Okay I have stopped the fetcher process') }); app.post('/alive', (req, res) => { killed = false; res.send({ success: true, message: 'Now New req to / will be entertained' }) }); app.post('/api/fetch', query); }; 

更新2

这是processBatch()函数,它的作用是从Amazon SQS获取数据,并在处理该数据后将其发送到另一个Amazon SQS并通过Amazon SNS通知订户。

 async function processBatch() { let data = await getDataFromQueue();// Wait for the promise returned after messages are retrieved from the Queue. let listOfReceipt = []; if (q.length() > 50 ) { // if queue length is more than 50 then wait for queue to process previous data ( done in order to put a max cap on queue size ) await sleep(400); console.log(q.length()); return CLEAN_EXIT; } //Also get the ReceiptHandles for those messages. (To be used for deletion later on) if (!data.Messages || !data.Messages.length) { pushSNS(null, true); pushDelete(null, true); return EMPTY_QUEUE; } try { for (let i = 0; i < data.Messages.length; i++) { data.Messages[i].Body = JSON.parse(data.Messages[i].Body); const URL = data.Messages[i].Body.url; const identifier = data.Messages[i].Body.identifier; listOfReceipt.push(data.Messages[i].ReceiptHandle);// get the ReceiptHandle out of the message. q.push(URL, async (err, html) => { if (err) { console.log(err); } else { await sendDataToQueue({url: URL, content: html, identifier}); pushDelete(data.Messages[i].ReceiptHandle); pushSNS(); } }); } } catch (e) { console.log(e); pushSNS(null, true); pushDelete(null, true); return CLEAN_EXIT; // simply ignore any error and delete that msg } return CLEAN_EXIT; } 

这里q是Async.queue,它是工作者函数的,即extractContent作用是获取提供的URL的内容。

这个模块有帮助function。

 const q = async.queue((URL, cb) => { extractContent(URL, array) .then((html) => { cb(null,html); }) .catch((e) => { cb(e); }) }, concurrency); function internalQueue(cb) { let arr = []; return function (message, flag) { arr.push(message); if(arr.length >= 10 || flag) { arr = []; cb(); } } } function sleep (delay) { return new Promise ((resolve, reject) => { setTimeout(() => resolve(), delay) }) } // this is done in order to do things in a batch, this reduces cost let pushSNS = internalQueue(sendDataToSNS); let pushDelete = internalQueue(deleteDataFromSQS); 

首先你的controller函数返回一个Promise,根据你的语句永远不会parsingprocessBatch永远不会返回EMPTY_QUEUE 。 我假设你在某处存储了返回的Promise,并且每个都消耗了内存。

另外,每次调用controller函数时,都会创build一个无限调用processBatch的新循环。 因此,如果controller 是快速路由的控制器function ,那么每当有人请求该路由时,就会创build一个无限地调用processBatch的新循环。 我敢打赌,这不是一个理想的行为,它绝对阻止大量的内存。

由于新的细节更新:

目前,如果有人将post on / kill ,然后POST'/ alive',她将能够在/ POST上启动另一个循环,因为processBatch 可能需要大约10秒来解决承诺 。 这样,如果有人将几个重复的POST发送到/ kill – > / alive – > / ,她将会有效地阻止你的应用程序。 可能这是发生了什么事情。

另一个更新

这个代码q.push(URL, async (err, html) => {开始一个新的查询并附加一个callback函数,在查询完成后调用q计数器在调用callback之前减less,但callback是asynchronous的( async ),并做了另一个查询await sendDataToQueue({url: URL, content: html, identifier});

正如你所看到的,如果sendDataToQueue执行比q慢,那么callback累积并消耗内存。