根据http请求从kafka获取最新消息

我正在使用kafka节点客户端作出restify api,当被调用返回最新的消息作为响应。 我使用下面的代码取得了一些成功。

var consumerOptions = { groupId: 'ExampleTestGroup', fetchMaxWaitMs:500, fromOffset: 'latest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest' }; consumerStream=new ConsumerStream(client,[{topic:'topic1'},{topic:'topic2'}],{fromOffset:'lates',fecthMaxWaitMs:500}); consumerStream.on('error',(err)=>{ console.log('err',err); }); consumerStream.pipe(Stringify()).pipe(res); setTimeout(()=>{ console.log('100 ms are up'); res.end(); next(); },1000); 

如果主题中没有足够的消息并返回响应,则必须使用超时。 这工作正常,但只在第一个请求,因为我假设偏移量是在新的请求到来时提交的,并且不会因此而提取任何消息。