如何使用AMQP在“PeekLock”模式下从Azure服务总线队列获取消息?

我们正在尝试在Node应用程序中使用Azure服务总线。 我们的要求是从队列中获取多个消息

由于Azure SDK for Node不支持批量检索,因此我们决定使用AMQP。 虽然我们可以按照此处所述使用Peek邮件获取邮件( https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations )。

我们注意到,只要消息被提取,它们就会从队列中移除。 我想知道是否有人了解如何使用AMQP和节点以“PeekLock”模式获取消息。 对于AMQP,我们使用的是amqp10节点包( https://www.npmjs.com/package/amqp10 )。

这里是我们的邮件偷看的代码:

const AMQPClient = require('amqp10/lib').Client, Policy = require('amqp10/lib').Policy; const protocol = 'amqps'; const keyName = 'RootManageSharedAccessKey'; const sasKey = 'My Shared Access Key' const serviceBusHost = 'account-name.servicebus.windows.net'; const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost; const queueName = 'test1'; var client = new AMQPClient(Policy.ServiceBusQueue); client.connect(uri) .then(function () { return Promise.all([ client.createReceiver(queueName), client.createSender(queueName) ]); }) .spread(function(receiver, sender) { console.log(receiver); console.log(sender); console.log('--------------------------------------------------------------------------'); receiver.on('errorReceived', function(err) { // check for errors console.log(err); }); receiver.on('message', function(message) { console.log('Received message'); console.log(message); console.log('------------------------------------'); }); return sender.send([], { operation: 'com.microsoft:peek-message', 'message-count': 5 }); }) .error(function (e) { console.warn('connection error: ', e); }); 

默认情况下,接收器工作在自动定位模式下,您必须更改它以确定处理方式

 const { Constants } = require('amqp10') // // ...create client, connect, etc... // // Second parameter of createReceiver method enables overwriting policy parameters const receiver = client.createReceiver(queueName, { attach: { rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition } }) 

处理之后不要忘记接受/拒绝/发布消息:

 receiver.on('message', msg => { // // ...do something smart with a message... // receiver.accept(msg) // <- manually settle a message })