使用XMLHttpRequest进行内存高效的消息块处理

我有一个XMLHttpRequest与一个progress事件处理程序,请求一个分块的页面,不断发送添加消息块。 如果我没有设置responseType ,我可以在每个progress事件中访问XMLHttpRequestresponse属性,并处理额外的消息块。 这种方法的问题在于浏览器必须将整个响应保存在内存中,并且由于这种内存浪费,浏览器最终会崩溃。

所以,我尝试了一个arraybufferresponseType ,希望我可以分割缓冲区,以防止以前的内存浪费。 不幸的是, progress事件处理程序在这一点上不再能够读取XMLHttpRequestresponse属性。 progress事件的事件参数也不包含缓冲区。 这是一个简短的,自包含的例子,我尝试了这个(这是为node.js写的):

 var http = require('http'); // -- The server. http.createServer(function(req, res) { if (req.url === '/stream') return serverStream(res); serverMain(res); }).listen(3000); // -- The server functions to send a HTML page with the client code, or a stream. function serverMain(res) { res.writeHead(200, {'Content-Type': 'text/html'}); res.write('<html><body>Hello World</body><script>'); res.end(client.toString() + ';client();</script></html>'); } function serverStream(res) { res.writeHead(200, {'Content-Type': 'text/html'}); setInterval(function() { res.write('Hello World<br />\n'); }, 1000); } // -- The client code which runs in the browser. function client() { var xhr = new XMLHttpRequest(); xhr.addEventListener('progress', function() { if (!xhr.response) return console.log('progress without response :-('); console.log('progress: ' + xhr.response.size); }, false); xhr.open('GET', '/stream', true); xhr.responseType = 'arraybuffer'; xhr.send(); } 

progress事件处理程序无法访问我想要的response 。 如何以一种有效的内存方式处理浏览器中的消息块? 请不要build议一个WebSocket 。 我不希望只使用一个处理消息块的只读stream。

XMLHttpRequest似乎并不是真正为这种用法devise的。 显而易见的解决scheme是轮询,这是XMLHttpRequest的stream行使用,但我猜你不想错过你的数据stream中的数据,这些数据会在调用之间滑动。

对我的问题Can the "real" data chunks be identified in some way or is it basically random data ? ,你回答了With some effort, the chunks could be identified by adding an event-id of sorts to the server-side

基于这个前提,我提出:

这个想法:合作并发的听众

  1. 连接到stream并设置进度监听器(称为listenerA() )。
  2. 块到达时,处理它并输出。 保持对listenerA()接收到的第一个和最后一个块的id的引用。 计算已收到多less块listenerA()
  3. listenerA()收到一定数量的组块后,产生另一个“线程”(connection + listener, listenerB() )并行执行步骤1和2,但将处理后的数据保存在缓冲区中而不是输出。
  4. listenerA()接收到与listenerA()接收到的第一个块相同的id的块时,向listenerB()发送一个信号,丢弃第一个连接并终止listenerA()
  5. listenerB()listenerB()接收到终止信号时,将缓冲区转储到输出并保持正常处理。
  6. listenerB()产生与以前相同的条件的listenerC()
  7. 不断重复尽可能多的连接+听众。

通过使用两个重叠的连接,可以防止丢弃单个连接并重新连接可能导致的块丢失。

笔记

  • 这假定数据stream对于所有连接是相同的,并且不引入一些个性化的设置。
  • 根据stream的输出速率和连接延迟,从一个连接转换到另一个连接的缓冲区转储可能会很明显。
  • 您也可以测量总响应大小而不是大块数来决定何时切换到新的连接。
  • 可能需要保留一个完整的区块ID列表来比较,而不仅仅是第一个和最后一个,因为我们不能保证重叠的时间。
  • XMLHttpRequestresponseType必须设置为默认值""或“text”才能返回文本。 其他数据types不会返回部分response 。 请参阅https://xhr.spec.whatwg.org/#the-response-attribute

在node.js中testing服务器

以下代码是一个node.js服务器,用于输出用于testing目的的一致元素stream。 你可以打开多个连接,输出将是相同的会话,减去可能的服务器滞后。

HTTP://本地主机:5500 /stream

将返回数据,其中id是递增的数字

HTTP://本地主机:5500 / streamRandom

将返回数据,其中id是一个随机的40个字符长的string。 这是为了testing一个场景,其中不能依赖ID来sorting数据。

 var crypto = require('crypto'); // init + update nodeId var nodeId = 0; var nodeIdRand = '0000000000000000000000000000000000000000'; setInterval(function() { // regular id ++nodeId; //random id nodeIdRand = crypto.createHash('sha1').update(nodeId.toString()).digest('hex'); }, 1000); // create server (port 5500) var http = require('http'); http.createServer(function(req, res) { if(req.url === '/stream') { return serverStream(res); } else if(req.url === '/streamRandom') { return serverStream(res, true); } }).listen(5500); // serve nodeId function serverStream(res, rand) { // headers res.writeHead(200, { 'Content-Type' : 'text/plain', 'Access-Control-Allow-Origin' : '*', }); // remember last served id var last = null; // output interval setInterval(function() { // output on new node if(last != nodeId) { res.write('[node id="'+(rand ? nodeIdRand : nodeId)+'"]'); last = nodeId; } }, 250); } 

概念validation,使用前面提到的node.js服务器代码

 <!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> </head> <body> <button id="stop">stop</button> <div id="output"></div> <script> /* Listening to a never ending page load (http stream) without running out of memory by using concurrent overlapping connections to prevent loss of data, using only xmlHttpRequest, under the condition that the data can be identified. listen arguments url url of the http stream chunkMax number of chunks to receive before switching to new connection listen properties output a reference to a DOM element with id "output" queue an array filled with non-duplicate received chunks and metadata lastFetcherId an incrementing number used to assign an id to new fetchers fetchers an array listing all active fetchers listen methods fire internal use fire an event stop external use stop all connections fetch internal use starts a new connection fetchRun internal use initialize a new fetcher object Usage var myListen = new listen('http://localhost:5500/streamRandom', 20); will listen to url "http://localhost:5500/streamRandom" will switch connections every 20 chunks myListen.stop() will stop all connections in myListen */ function listen(url, chunkMax) { // main ref var that = this; // output element that.output = document.getElementById('output'); // main queue that.queue = []; // last fetcher id that.lastFetcherId = 0; // list of fetchers that.fetchers = []; //********************************************************* event dispatcher that.fire = function(name, data) { document.dispatchEvent(new CustomEvent(name, {'detail':data})); } //******************************************************** kill all fetchers that.stop = function() { that.fire('fetch-kill', -1); } //************************************************************** url fetcher that.fetch = function(fetchId, url, fetchRef) { //console.log('start fetcher #'+fetchId); var len = 0; var xhr = new XMLHttpRequest(); var cb_progress; var cb_kill; // progress listener xhr.addEventListener('progress', cb_progress = function(e) { // extract chunk data var chunkData = xhr.response.substr(len); // chunk id var chunkId = chunkData.match(/id="([a-z0-9]+)"/)[1]; // update response end point len = xhr.response.length; // signal end of chunk processing that.fire('chunk-ready', { 'fetchId' : fetchId, 'fetchRef' : fetchRef, 'chunkId' : chunkId, 'chunkData' : chunkData, }); }, false); // kill switch document.addEventListener('fetch-kill', cb_kill = function(e) { // kill this fetcher or all fetchers (-1) if(e.detail == fetchId || e.detail == -1) { //console.log('kill fetcher #'+fetchId); xhr.removeEventListener('progress', cb_progress); document.removeEventListener('fetch-kill', cb_kill); xhr.abort(); that.fetchers.shift(); // remove oldest fetcher from list xhr = null; delete xhr; } }, false); // go xhr.open('GET', url, true); xhr.responseType = 'text'; xhr.send(); }; //****************************************************** start a new fetcher that.fetchRun = function() { // new id var id = ++that.lastFetcherId; //console.log('create fetcher #'+id); // create fetcher with new id var fetchRef = { 'id' : id, // self id 'queue' : [], // internal queue 'chunksIds' : [], // retrieved ids, also used to count 'hasSuccessor' : false, // keep track of next fetcher spawn 'ignoreId' : null, // when set, ignore chunks until this id is received (this id included) }; that.fetchers.push(fetchRef); // run fetcher that.fetch(id, url, fetchRef); }; //************************************************ a fetcher returns a chunk document.addEventListener('chunk-ready', function(e) { // shorthand var f = e.detail; // ignore flag is not set, process chunk if(f.fetchRef.ignoreId == null) { // store chunk id f.fetchRef.chunksIds.push(f.chunkId); // create queue item var queueItem = {'id':f.chunkId, 'data':f.chunkData}; // chunk is received from oldest fetcher if(f.fetchId == that.fetchers[0].id) { // send to main queue that.queue.push(queueItem); // signal queue insertion that.fire('queue-new'); } // not oldest fetcher else { // use fetcher internal queue f.fetchRef.queue.push(queueItem); } } // ignore flag is set, current chunk id the one to ignore else if(f.fetchRef.ignoreId == f.chunkId) { // disable ignore flag f.fetchRef.ignoreId = null; } //******************** check chunks count for fetcher, threshold reached if(f.fetchRef.chunksIds.length >= chunkMax && !f.fetchRef.hasSuccessor) { // remember the spawn f.fetchRef.hasSuccessor = true; // spawn new fetcher that.fetchRun(); } /*********************************************************************** check if the first chunk of the second oldest fetcher exists in the oldest fetcher. If true, then they overlap and we can kill the oldest fetcher ***********************************************************************/ if( // is this the oldest fetcher ? f.fetchId == that.fetchers[0].id // is there a successor ? && that.fetchers[1] // has oldest fetcher received the first chunk of its successor ? && that.fetchers[0].chunksIds.indexOf( that.fetchers[1].chunksIds[0] ) > -1 ) { // get index of last chunk of the oldest fetcher within successor queue var lastChunkId = that.fetchers[0].chunksIds[that.fetchers[0].chunksIds.length-1] var lastChunkIndex = that.fetchers[1].chunksIds.indexOf(lastChunkId); // successor has not reached its parent last chunk if(lastChunkIndex < 0) { // discard whole queue that.fetchers[1].queue = []; that.fetchers[1].chunksIds = []; // set ignore id in successor to future discard duplicates that.fetchers[1].ignoreId = lastChunkId; } // there is overlap else { /** console.log('triming queue start: '+that.fetchers[1].queue.length +" "+(lastChunkIndex+1) +" "+(that.fetchers[1].queue.length-1) ); /**/ var trimStart = lastChunkIndex+1; var trimEnd = that.fetchers[1].queue.length-1; // trim queue that.fetchers[1].queue = that.fetchers[1].queue.splice(trimStart, trimEnd); that.fetchers[1].chunksIds = that.fetchers[1].chunksIds.splice(trimStart, trimEnd); //console.log('triming queue end: '+that.fetchers[1].queue.length); } // kill oldest fetcher that.fire('fetch-kill', that.fetchers[0].id); } }, false); //***************************************************** main queue processor document.addEventListener('queue-new', function(e) { // process chunks in queue while(that.queue.length > 0) { // get chunk and remove from queue var chunk = that.queue.shift(); // output item to document if(that.output) { that.output.innerHTML += "<br />"+chunk.data; } } }, false); //****************************************************** start first fetcher that.fetchRun(); }; // run var process = new listen('http://localhost:5500/streamRandom', 20); // bind global kill switch to button document.getElementById('stop').addEventListener('click', process.stop, false); </script> </body> </html>