填充队列时Azure函数中的超时

我们有一个简单的ETL过程来从一个API中提取数据到一个我们想用函数实现的文档数据库。 简单地说,这个过程是取一个约16,500行文件,从每行(function1)中提取一个ID,为每个ID(function2)build立一个URL,使用URL(function3)打一个API,存储响应在文件DB(function4)中。 我们正在使用队列进行function间通信,并在执行此操作时在第一个function中看到超时问题。

函数1(index.js)

module.exports = function (context, odsDataFile) { context.log('JavaScript blob trigger function processed blob \n Name:', context.bindingData.odaDataFile, '\n Blob Size:', odsDataFile.length, 'Bytes'); const odsCodes = []; odsDataFile.split('\n').map((line) => { const columns = line.split(','); if (columns[12] === 'A') { odsCodes.push({ 'odsCode': columns[0], 'orgType': 'pharmacy', }); } }); context.bindings.odsCodes = odsCodes; context.log(`A total of: ${odsCodes.length} ods codes have been sent to the queue.`); context.done(); }; 

function.json

 { "bindings": [ { "type": "blobTrigger", "name": "odaDataFile", "path": "input-ods-data", "connection": "connecting-to-services_STORAGE", "direction": "in" }, { "type": "queue", "name": "odsCodes", "queueName": "ods-org-codes", "connection": "connecting-to-services_STORAGE", "direction": "out" } ], "disabled": false } 

完整代码在这里

当ID的数量在100以上时,这个函数可以正常工作,但是在1000以后的10个数字中超时。 ID数组的构build以毫秒为单位发生,函数完成,但是将项添加到队列中似乎花费了很多分钟,最终导致默认为5分钟的超时。

令我感到惊讶的是,填充队列的简单行为似乎需要很长时间,并且函数的超时似乎包括函数外部的任务(即队列总数)的时间。 这是预期的吗? 有更多高性能的方法吗?

我们正在消费(dynamic)计划下运行。

我从我的本地机器做了一些testing,发现在预期的队列中插入一条消息需要200ms。 所以如果你有17k条消息要插入,并按顺序进行,时间将会是:

17,000条消息* 200ms = 3,400,000ms或约56分钟

当从云端运行时,延迟可能会更快一些,但是当插入那么多的消息时,你可以很快看到这会跳过5分钟。

如果消息顺序不重要,则可以并行插入消息。 但是有一些注意事项:

  1. 你不能用节点做这个 – 它必须是C#。 节点不向您公开IAsyncCollector接口,所以它在幕后。
  2. 您不能并行插入所有内容,因为消费计划一次最多可以有250个networking连接。

下面是一个例子,一次加载插入200个 – 有17k个消息,在我的快速testing中,这花了一分钟。

 public static async Task Run(string myBlob, IAsyncCollector<string> odsCodes, TraceWriter log) { List<Task> tasks = new List<Task>(); string[] lines = myBlob.Split(Environment.NewLine.ToCharArray(), StringSplitOptions.RemoveEmptyEntries); int skip = 0; int take = 200; IEnumerable<string> batch = lines.Skip(skip).Take(take); while (batch.Count() > 0) { await AddBatch(batch, odsCodes); skip += take; batch = lines.Skip(skip).Take(take); } } public static async Task AddBatch(IEnumerable<string> lines, IAsyncCollector<string> odsCodes) { List<Task> tasks = new List<Task>(); foreach (string line in lines) { tasks.Add(odsCodes.AddAsync(line)); } await Task.WhenAll(tasks); } 

正如其他答案已经指出,因为Azure队列没有​​批量API,你应该考虑一个替代,如服务总线队列。 但是,如果您坚持使用Azure队列,则需要避免按顺序输出队列项,即某种forms的约束并行是必要的。 达到此目的的一种方法是使用TPL数据stream库 。

数据stream必须使用批处理任务并执行WhenAll(..)的一个好处是,您将永远不会有一个批处理快完成的情况,并且在开始下一个批处理之前,您正在等待一个缓慢的执行完成。

我比较了插入10000个任务批量为32的数据stream和数据stream并行性设置为32.批处理完成60秒,数据stream几乎完成了一半(32秒)。

代码看起来像这样:

  using System.Threading.Tasks.Dataflow; ... var addMessageBlock = new ActionBlock<string>(async message => { await odscodes.AddAsync(message); }, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true, MaxDegreeOfParallelism = 32}); var bufferBlock = new BufferBlock<string>(); bufferBlock.LinkTo(addMessageBlock, new DataflowLinkOptions { PropagateCompletion = true }); foreach(string line in lines) bufferBlock.Post(line); bufferBlock.Complete(); await addMessageBlock.Completion;