停止函数被多次调用

我正在build立一个file upload组件,允许你暂停/恢复file upload。

实现这一目标的标准方法似乎是将文件分解成客户端计算机上的块,然后将块和书籍信息一起发送到服务器,服务器可以将块存储到暂存目录中,然后将它们合并在一起已经收到了所有的块。 所以,这就是我正在做的。

我正在使用节点/快递,我能够得到的文件很好,但我遇到了一个问题,因为我的merge_chunks函数被调用多次。

这是我的调用堆栈:

 router.post('/api/videos', upload.single('file'), validate_params, rename_uploaded_chunk, check_completion_status, merge_chunks, record_upload_date, videos.update, send_completion_notice ); 

check_completion_status函数实现如下:

 /* Recursively check to see if we have every chunk of a file */ var check_completion_status = function (req, res, next) { var current_chunk = 1; var see_if_chunks_exist = function () { fs.exists(get_chunk_file_name(current_chunk, req.file_id), function (exists) { if (current_chunk > req.total_chunks) { next(); } else if (exists) { current_chunk ++; see_if_chunks_exist(); } else { res.sendStatus(202); } }); }; see_if_chunks_exist(); }; 

暂存目录中的文件名称中embedded了块编号,因此我们的想法是查看是否有每个块编号的文件。 该函数只能对给定的(完整)文件进行next()一次。

但是,我的merge_chunks函数被调用多次。 (通常在1到4之间)日志logging确实显示它只我收到所有的块之后才被调用。

考虑到这一点,我的假设是fs.exists函数的asynchronous性导致了这个问题。

即使第n次调用check_completion_status可能发生在我拥有所有的块之前,当我们到达对fs.exists()n次调用时,更多的块可能已经到达并被同时处理,所以函数可以继续下去,在某些情况下,可以继续next() 。 然而,那些同时到达的块也将与check_completion_status调用相对应,这些调用也将到next()因为我们显然拥有所有这些文件。

这是造成的问题,因为我没有考虑到这一点,当我写merge_chunks

为了完整性,这里是merge_chunks函数:

 var merge_chunks = (function () { var pipe_chunks = function (args) { args.chunk_number = args.chunk_number || 1; if (args.chunk_number > args.total_chunks) { args.write_stream.end(); args.next(); } else { var file_name = get_chunk_file_name(args.chunk_number, args.file_id) var read_stream = fs.createReadStream(file_name); read_stream.pipe(args.write_stream, {end: false}); read_stream.on('end', function () { //once we're done with the chunk we can delete it and move on to the next one. fs.unlink(file_name); args.chunk_number += 1; pipe_chunks(args); }); } }; return function (req, res, next) { var out = path.resolve('videos', req.video_id); var write_stream = fs.createWriteStream(out); pipe_chunks({ write_stream: write_stream, file_id: req.file_id, total_chunks: req.total_chunks, next: next }); }; }()); 

目前,我收到一个错误,因为函数的第二次调用试图读取已经被第一次调用删除的块。

处理这种情况的典型模式是什么? 如果可能,我想避免有状态的架构。 在check_completion_status中调用next()之前是否可以取消挂起的处理程序?

如果你只是想让它工作尽快,我会使用一个锁(很像一个数据库锁)来locking资源,以便只有一个请求处理块。 只需在客户端上创build一个唯一的ID,并将其与块一起发送。 然后把这个唯一的id存储在某种数据结构中,然后在处理之前查看这个id。 下面的例子目前还不是最优的(实际上这张地图会不断增长,这是不好的),但它应该展示这个概念

 // Create a map (an array would work too) and keep track of the video ids that were processed. This map will persist through each request. var processedVideos = {}; var check_completion_status = function (req, res, next) { var current_chunk = 1; var see_if_chunks_exist = function () { fs.exists(get_chunk_file_name(current_chunk, req.file_id), function (exists) { if (processedVideos[req.query.uniqueVideoId]){ res.sendStatus(202); } else if (current_chunk > req.total_chunks) { processedVideos[req.query.uniqueVideoId] = true; next(); } else if (exists) { current_chunk ++; see_if_chunks_exist(); } else { res.sendStatus(202); } }); }; see_if_chunks_exist(); };