Tag: rxjs

如何在rxjs中使用节点的变换stream?

我一直在玩rxjs一段时间,我喜欢如何使用它的运算符逻辑,而不是命令式编程。 不过,我也喜欢节点的stream,这也是高度可组合的,所以我明显的反应是使用它们两个,但我没有看到它被提及了很多(实际上,我根本没有),除了绑定在rxjs的书 。 所以,我的问题确实是,我如何利用RxJS中的所有转换stream? 或者,甚至有可能? 例:- var fs = require('fs'); var csv = require('csv-parse')({delimiter:';'}); var src = fs.createReadStream('./myFile.csv'); src.pipe(csv).pipe(process.stdout); 基本上,我想这样做: – var fs = require('fs'); var csv = require('csv-parse')({delimiter:';'}); var rx= require('rx-node'); var src = fs.createReadStream('./myFile.csv'); var obj = rx.fromReadableStream(src); obj.pipe(csb).map(x=>console.log(x)); 过去我被告知使用highland ,但是我在这里严格地寻找rxjs解决scheme。

Rxjs如何知道有多less可观察的用户?

当我有一个可观察的多个订阅者,例如: const myInterval = Rx.Observable.interval(500); const subscriptionOne = myInterval.subscribe(val => doSomething()); const subscriptionTwo = myInterval.subscribe(val => doSomething()); 我怎么知道有多less订户仍然注册到myInterval可观察的? 我需要这个信息,例如, 以防万一我忘了取消订阅其中之一的内存泄漏 ?

使用Javascript进行反应式编程

我在反应式编程方面有了新的东西,而且我可能会丢失所有这些我无法理解的文章。 其实,我是来自Nodejs,Angularjs,Angular 2和React的JavaScript开发人员。 我做的事 我始终使用promise,用于远程数据读取,本地asynchronousparsing等…更好的可testing性比callback,并符合我的需求。 我理解使用stream 除了特殊的情况外,我无法弄清楚在哪里可以解救我。 这种特殊情况是因为承诺只能解决一次,所以在听stream时我不能使用承诺。 SocketIo的一个例子: io.on('connection', (socket) => { // this works }); io.on('connection').then((socket) => { // this can't work, promise would be resolved only once }); 如果我没有错,我可以使用反应stream来处理这个案件,只需返回一个可观察的。 对 ? 我不明白 我正在学习Angular 2和所有的东西。 实际上,从许多博客,人们使用observables来获取远程数据,我不明白什么可能是使用它的优势,而不是承诺。 事实是,我需要在这两种情况下做一个遥控器,为什么比另一个更多? 这是一个性能问题? 我需要的 如果您已经阅读了整个问题,我需要的是了解在远程数据获取情况下使用响应式编程而不是承诺的优点是什么? 在哪些情况下(其他情况下)比平时使用react native的东西更好?

试图让我自己的RxJs可观察

我试图转换现有的API与RxJS一起工作…对于节点来说相当新,对于RxJs来说也很新,所以请耐心等待。 我有一个现有的API(getNextMessage),它可以阻塞(asynchronous),或者当某些东西变得可用时,通过节点样式(err,val)callback返回一个新的项目或错误。 所以它看起来像这样: getNextMessage(nodeStyleCompletionCallback); 你可以把getNextMessage想象成一个http请求,在将来当服务器响应时,这个请求就会完成,但是一旦收到消息,你需要重新调用getNextMessage,以便不断从服务器获取新的项目。 所以,为了使它成为一个可观察的集合,我必须让RxJs继续调用我的getNextMessage函数,直到用户被处置(); 基本上,我正在尝试创build自己的RxJs可观察集合。 问题是: 我不知道如何使subscriber.dispose()杀死async.forever 我可能不应该首先使用async.forever 我不确定我应该甚至为每条消息“完成” – 不应该在一个序列的末尾 我想最终删除使用fromNodeCallback的需要,有一个一stream的RxJS可观察 显然我有点困惑。 会爱一点帮助,谢谢! 这是我现有的代码: var Rx = require('rx'); var port = require('../lib/port'); var async = require('async'); function observableReceive(portName) { var observerCallback; var listenPort = new port(portName); var disposed = false; var asyncReceive = function(asyncCallback) { listenPort.getNextMessage( function(error, json) { observerCallback(error, json); if […]

将Observable实现到持久队列库中

目前正在编写一个小的持久队列库,将读/写行到一个文本文件。 这里是add方法,例如: Queue.prototype.add = function(line, cb){ getLock(this, err => { if(err){ this.emit('error', err); releaseLock(err, cb); } else{ fs.appendFile(this.filepath, line, err => { err && this.emit('error', err); releaseLock(err, cb); }); } }); }; 我觉得很尴尬,支持事件发射器和callback(或事件发射器和承诺)。 换句话说,对于队列中的每个方法(add,peek,remove),我需要返回/callback每个调用所特定的结果。 仅使用事件发射器意味着调用者可能会针对他们刚才所做的调用不是特定的结果。 所以callback或承诺似乎是必要的 – 你不能只使用事件发射器。 我想知道的是 – 可以观察到不知何故解决必须与事件发射器或事件发射器的承诺配对callback的问题? 我正在寻找一种方法来实现这种只有一种typesasynchronouscallback机制的evented /asynchronous队列。 也许observables不是这里的答案,但我正在寻找一个好的devise模式。

根据rxjs中的时间处理一系列事件

我有一个过程,每隔一段时间发送一个数据包,我需要根据数据包到达的时间来pipe理这个数据stream,等等。 在某些时候,我也closuresstream和过程。 现在,我使用一组定时器来做这件事,但是我希望我可以用rxjs因为它看起来非常适合这种事情。 到目前为止,我还没有成功。 问题 stream应该定期发送数据包,但通常会偏离很多,有时会卡住。 在下列情况下,我想在某一时刻closuresstream: 如果需要比startDelay更多的时间发送第一个数据包。 在发送第一个数据包之后,如果两个数据包之间存在多于middleDelay的暂停。 经过一个恒定的时间段maxChannelTime 。 由于上述任何一种原因,当我即将closures这个stream时,我首先要求它有礼貌地closures,这样可以进行一些清理工作。 有时它也会在清理过程中给我发送一个最后的数据包。 但是我想等待cleanupTime的清理和最后的数据到达之前,我closures了stream,忽略任何更多的消息。 精 我将通过用Observable包装事件来创build“stream”。 我没有这样做的麻烦。 通过“closures”stream,我的意思是告诉stream程停止发送数据,并可能closures(即死亡)。

RxJS + node.js HTTP服务器实现?

在RxJS实现之前,我一直使用node.js。 这是我的试用码学习 – Reactive-Extensions / rxjs-node https://github.com/Reactive-Extensions/rxjs-node rx_http.js (node.js的http lib的RxJS包装器) var Rx = require("./rx.min"); var http = require("http"); for(var k in http) { exports[k] = http[k]; } exports.createServer = function () { var subject = new Rx.AsyncSubject(); var observable = subject.asObservable(); observable.server = http.createServer(function (request, response) { subject.onNext({ request:request, response:response }); subject.onCompleted(); }); return observable; […]

如何在公共事件属性上join两个事件stream?

考虑以下两个事件stream。 每个事件都有一个timestamp / ts和value属性。 我想要将这两个事件具有相同时间戳的stream合并到一个具有应用的值转换的结果stream中。 如果一个数据stream缺less一个时间戳(例如下面例子中的黄色ts=3 ),那么该时间戳应该被忽略。 想用诸如xstream或rxjs之类的反应式编程库来解决这个问题。 我对于反应式编程的概念相当陌生,但是如果有人有另外一个build议,我就会全神贯注。 谢谢!

如何在没有主题的情况下将WS Server正确转向RXJS Api

在Node.js中将着名的ws模块转换为响应式api的正确方法是什么? 我明白,科目可以帮助桥接非react native事件和react native事件,但是他们的问题在于,处理其相关对象的时间要困难得多。 var WebSocketServer = require('ws').Server; var wss = new WebSocketServer({ port: 8080 }); var Rx = require('rx'); var connectionMessageSubject = new Rx.Subject(); wss.on('connection', function connection(client) { ws.on('message', function incoming(message) { connectionMessageSubject.onNext({ client: client, message: message }); }); }); 我不能使用它们内置的fromEvent方法,因为它注册了许多不同的事件,当30个或更多的客户端连接时,NodeJS会发出警告。 例如… var WebSocketServer = require('ws').Server; var wss = new WebSocketServer({port:8080}); var connectionMessageObservable; //this […]