RxJS + node.js HTTP服务器实现?

在RxJS实现之前,我一直使用node.js。

这是我的试用码学习 –

Reactive-Extensions / rxjs-node https://github.com/Reactive-Extensions/rxjs-node


rx_http.js
(node.js的http lib的RxJS包装器)

var Rx = require("./rx.min"); var http = require("http"); for(var k in http) { exports[k] = http[k]; } exports.createServer = function () { var subject = new Rx.AsyncSubject(); var observable = subject.asObservable(); observable.server = http.createServer(function (request, response) { subject.onNext({ request:request, response:response }); subject.onCompleted(); }); return observable; }; 

server.js

 var http = require('./rx_http'); // rxServer var serverObservable = http.createServer(); var port = 3000; serverObservable.server.listen(port); console.log("Server listening on port: "+port); // HTTP request event loop function serverObservable.subscribe(function (data) { var req = data.request; console.log(req.headers); var res = data.response; res.writeHead(200, {'Content-Type':"text/html"}); res.end("hello world"); console.log("res content out"); }); // exceptiopn process.on('uncaughtException', function (err) { console.log(['Caught exception:', err.message].join(" ")); }); 

代码结束了一次性“hello world”输出到浏览器,RxServer停止响应另一个访问(brwoser重新加载等)。

我正在学习RxJS的方法,但在网上find的文档很less。

告诉我代码有什么问题,如果你知道更好的实现,请分享。 谢谢。

在rx_http.js中使用Rx.Subject而不是Rx.AsyncSubject。

AsyncSubjectcachingonNext()的最后一个值,并在完成时传播给所有观察者。 AsyncSubject

 exports.createServer = function () { var subject = new Rx.Subject(); var observable = subject.asObservable(); observable.server = http.createServer(function (request, response) { subject.onNext({ request:request, response:response }); }); return observable; }; 

当第一个请求到达时,调用完成的主题结束可观察序列。 你可以请删除该行再试一次。

我希望它有帮助。

艾哈迈德·阿里·阿卡斯