NodeJS – 响应stream

我使用Sails.js构build了一个使用NodeJS的简单API端点。

当有人访问我的API端点时,服务器开始等待数据,每当出现新数据时,他都使用套接字广播它。 每个客户应该根据他的用户input接收他自己的数据stream。

var Cap = require('cap').Cap; collect: function (req, res) { var iface = req.param("ip"); var c = new Cap(), device = Cap.findDevice(ip); c.on('data', function(myData) { sails.sockets.blast('message', {"host": myData}); }); }); 

响应没有完成(我从来没有发送res.json() – 实际上发生的事情是浏览器不断加载 – 但上述function)。

2问题:

  • 我正尝试从我的客户端(使用RxJS)订阅和取消订阅此API端点。 当我订阅时,我开始通过套接字接收数据,但我不能退订API端点(浏览器期望完成请求)。

  • 每个客户端都应该根据请求IP参数订阅他自己的套接字空间(请参阅更新后的代码)。 目前它把这个信息传达给每个人。

如何用Sails.js创build一个类似Stream / Service的API端点,并根据他的input向每个用户发送新的数据?

我的目标是能够从每个客户端订阅/取消订阅此API端点。

修订答案

假设您的API端点在config/routes.js定义如下:

 ... 'get /collect': 'SomeController.collectSubscribe', 'delete /collect': 'SomeController.collectUnsubscribe', 

由于每个Cap实例绑定到一个设备,因此每个订阅都需要一个实例。 而不是使用sails join / leave方法,我们跟踪内存中的Cap实例,只是broadcast到请求套接字的ID。 这是有效的,因为Sails套接字默认订阅了自己的ID。

api/controllers/SomeController.js

 // In order for the `Cap` instances to persist after `collectSubscribe` finishes, we store them all in an Object, associated with which socket the were created for. var caps = {/* req.socket.id: <instance of Cap>, */}; module.exports = { ... collectSubscribe: function(req, res) { if (!res.isSocket) return res.badRequest("I need a websocket! Help!"); if (!!caps[req.socket.id]) return res.badRequest("Dude, you are already subscribed."); caps[req.socket.id] = new Cap(); var c = caps[req.socket.id]; // remember that `c` is a reference to our new `Cap`, not a copy. var device = c.findDevice(req.param('ip')); c.open(device, ...); c.on('data', function(myData) { sails.sockets.broadcast(req.socket.id, 'message', {host: myData}); }); return res.ok(); }, collectUnsubscribe: function(req, res) { if (!res.isSocket) return res.badRequest("I need a websocket! Help!"); if (!caps[req.socket.id]) return res.badRequest("I can't unsubscribe you unless you actually subscribe first."); caps[req.socket.id].removeAllListeners('data'); delete caps[req.socket.id]; return res.ok(); } } 

基本上,它是这样的:当浏览器请求触发collectSubscribe ,一个新的Cap实例监听提供的IP。 当浏览器触发collectUnsubscribe ,服务器检索Cap实例,告诉它停止监听,然后删除它。

生产注意事项 :请注意,Caps列表不是持续的(因为它存储在内存中而不是数据库)! 所以,如果你的服务器被closures并重新启动(由于雷雨天气等),清单将被清除,但考虑到所有的websocket连接将被丢弃,我不觉得有任何担心。

老答案,保留参考

您可以使用sails.sockets.join(req, room)sails.sockets.leave(req, room)来pipe理套接字间。 基本上你有一个叫“收集”的房间,只有join这个房间的套接字才会收到一个sails.sockets.broadcast(room, eventName, data)

有关如何在这里用户sails.sockets更多信息。

api/controllers/SomeController.js

 collectSubscribe: function(req, res) { if (!res.isSocket) return res.badRequest(); sails.sockets.join(req, 'collect'); return res.ok(); }, collectUnsubscribe: function(req, res) { if (!res.isSocket) return res.badRequest(); sails.sockets.leave(req, 'collect'); return res.ok(); } 

最后,我们需要告诉服务器将消息广播到我们的'collect'房间。 请注意,这只需要发生一次 ,所以你可以在config/目录下的一个文件中进行。

在这个例子中,我将把它放在config/sockets.js

 module.exports = { // ... }; c.on('data', function(myData) { var eventName = 'message'; var data = {host: myData}; sails.sockets.broadcast('collect', eventName, data); }); 

我假设c在这里可以访问; 如果没有,你可以将其定义为sails.c = ...使其可以全局访问。