我如何使用参数化过滤创build一个类似Twitter的stream媒体API?

我正在尝试开发一个与Twitter的stream式API( https://dev.twitter.com/streaming/reference/post/statuses/filter )function相同的数据streamAPI,即具有过滤function的数据stream。 我正在生成大量数据,并希望将其提供给客户端。

我知道如何制作一个为所有客户端提供相同数据的应用程序。 这相对容易。 我遇到的困难来自于允许客户指定数据filter并为每个客户提供独特的数据。

原油图

我的想法:

首先我想打开stream式HTTP请求(如Twitter)。 我可以创build一个端点来接受带有参数的GET请求(例如https://stream.example.com/v1/filter.json?track=twitter )。 根据这个答案streamAPI和Rest API? ,这不容易扩展,需要大量的资源。

然后我想用websockets,让客户端提供一个过滤信息(例如locations = -122.75,36.8,-121.75,37.8)。 但是,我找不到一个WS服务器为每个客户端分配唯一数据的好例子。 如果inheritance了tornado.websocket.WebSocketHandler或类似的实现,这个类会是什么样子?

我还考虑将数据推送到消息服务(RabbitMQ)或数据库(Redis),并在客户端连接到其独特的频道时订阅客户端。 (我想这个问题任何想法如何创build参数化的streamapi? )。 我不知道创build独特渠道的有效方法。 这似乎也过于复杂。

我宁愿在Python中这样做,但我也会考虑使用Ruby和JS实现。

不太熟悉Python,但我认为这应该可以使用Websockets。 这里是我在Ruby中的承担,希望它有任何帮助。 这些是剥离的版本与大部分websocketfunction删除只是为了演示。

然而,对于使用stream媒体API的最佳实践,我恐怕没有什么帮助。

服务器

require 'em-websocket' require 'json' def filtered_stream(filter, ws) loop do # do something with filter, transform or send different kinds of data ws.send "#{filter} - hello" sleep 2 end end EM.run { EM::WebSocket.run(:host => "127.0.0.1", :port => 9999) do |ws| ws.onopen { |handshake| # We can access some information in the handshake object such as headers # Perhaps even add the client to a list / table } ws.onmessage { |msg| # use ws.close to close the connection if needed # for example if the client attempts to send an invalid message or multiple messages? filter = JSON.parse(msg)['locations'] Thread.new { filtered_stream(filter, ws) } } end } 

客户

 require 'websocket-eventmachine-client' EM.run do ws = WebSocket::EventMachine::Client.connect( :uri => 'ws://localhost:9999', ) ws.onopen do # Unsure on how to supply the filter, headers is an option too ws.send "{\"locations\":\"-122.75,36.8,-121.75,37.8\"}" end ws.onmessage do |msg| p msg end end