Tag: rxjs

尝试在JavaScript中重播GPX路线,但却在拖延航点的努力中挣扎

我有一些GPX文件,其中包含我想要在node.js脚本中重播的路线。 这是为了支持testing一个地理应用程序,所以我想重播与路线被捕获时相同的时差。 例如,如果我有这样的文件: <?xml version="1.0" encoding="UTF-8"?> <gpx version="1.1" creator="Runkeeper – http://www.runkeeper.com" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.topografix.com/GPX/1/1" xsi:schemaLocation="http://www.topografix.com/GPX/1/1 http://www.topografix.com/GPX/1/1/gpx.xsd" xmlns:gpxtpx="http://www.garmin.com/xmlschemas/TrackPointExtension/v1"> <trk> <name><![CDATA[Running 7/9/13 5:12 pm]]></name> <time>2013-07-09T22:12:45Z</time> <trkseg> <trkpt lat="46.414311000" lon="-94.356703000"><ele>386.7</ele><time>2013-07-09T22:12:45Z</time></trkpt> <trkpt lat="46.414328000" lon="-94.356708000"><ele>386.0</ele><time>2013-07-09T22:12:46Z</time></trkpt> <trkpt lat="46.414404000" lon="-94.356637000"><ele>385.6</ele><time>2013-07-09T22:12:49Z</time></trkpt> <trkpt lat="46.414486000" lon="-94.356562000"><ele>385.4</ele><time>2013-07-09T22:12:52Z</time></trkpt> … 我想要脚本处理第一个点,等待1秒,然后处理第二个点,在处理第三个点之前等待3秒,然后在处理第三个点之前再等待3秒…等等。 这可能并不重要,但是我正在使用这些数据并更新一个Firebase数据库,所以我需要对每个数据点进行其他“asynchronous”处理。 我可以将数据放入一个数组中,所以我试着用一些setTimeout调用来使用一个简单的foreach循环,但是我同时发射了一堆更新,每个更新都被延迟了一段时间。 然后我尝试使用如下所示的observables: // waypoints is an array of the … waypoints // Observable.from(waypoints) .zip(Observable.timer(0, 1000), x => […]

如何用RxJS编写简单的元stream

背景 这是我第一次尝试反应式编程。 我有一个不时接收数据缓冲区的stream。 当一个缓冲区以02开始时,意味着一个消息的开始,当它以03结束时,意味着消息的结束。 例: – – – 02 53 44 5a – – – 52 6a 72 7a – – – 62 32 62 0d – 0a 03 – – – – > 目的 我的目标是每次检测到一个消息完成时发出一个事件,以整个消息作为参数。 研究 阅读了几个关于响应式编程和阅读RxJS的 教程之后,我知道我想创build一个热门的可观察 。 通过我的研究,我相信我想要创造一个“元观察”或“元stream”,即可观察到的可观察到的stream或溪stream(可能是这样的混淆……)。 我的计划如下: 1 – 为所有进入2的缓冲区消息创build一个观测值 – 订阅步骤1,并创build一个观测值,检测消息的开始( 02 )和消息的结束( 03 )3 – 为步骤2订阅一些内容将照顾的数据。 所以,我的图如下所示: – […]

RxJS Observable from CLI和应用程序中的交互循环

我正在使用node.js构build一个简单的命令行工具。 在过去,我正在使用这样的Promise方法 function listenCommand(){ inquirer.prompt([{ type:'input', name:'value', message:"Enter commande :" }]).then(function (command) { processCmd(command); }); } function processCmd(){ … listenCommand() } 这将创build我的主循环input命令。 当命令执行时,应用程序将要求下一个。 我现在正在尝试将其转换为类似的RxJS方法。 function listenCommand(){ let listener = Rx.Observable.fromPromise(inquirer.prompt([{ type:'input', name:'value', message:"Enter commande :" }])); listener.subscribe(function (command) { processCmd(command); }); } function processCmd(){ … listenCommand() } 它的工作原理,但这听起来不太好。 用RxJS做这个快速循环的正确方法是什么? 还是RxJS不适合这种工作,我应该坚持承诺的方法呢?

RxJs – 当有可变数量的请求时,如何实现并行HTTP请求

我正在使用Express开发一个Node.js API,使用node-rest-client模块发出http请求。 其中一个开发的API端点是/api/v1/users/:userId ,它返回用户的全部信息,用户信息以及他所属部门的详细信息。 为了得到这个信息,有这个后端REST服务: /users/:userId – 返回带有用户信息和部门ID列表的JSON对象,例如: { "name" : "xxx", "departments" : [1, 5 ,6, 8] } /departments/:departmentId – 具有部门信息的JSON对象 { "id" : x, "name" : "xxx" } 调用/api/v1/users/1需要调用 GET /user/1 – > { "name" : "user1" , "departments" : [1, ,5 ,7 ,8]} 获取部门ID并对/departments/deparmentId进行n次调用 完成所有调用后,编写完整的JSON响应并将其返回。 我想使用RxJs将请求alignment,所以我想用Rx.Observable.zip()就足够了。 重点是,如果我有一个Observable数组,它的大小不固定,表示每个HTTP请求调用,我怎样才能调用Observable.zip() ? 如果在固定的数组中的元素的数量我会这样做: var observables = […]

Rx.js,订阅被调用undefined

我正在使用Rx.js将来自AJAX调用的结果stream式传输到多个单元。 但是,在观察者订阅MapObserver的时候,我遇到了更多的问题。 当第一个用户将总是得到正确的数据,但其余的将不明确。 this.observable = new Rx.Subject(); observeMap = this.observable .map(createMarker.bind(this)); var s1 = observeMap.subscribe(console.log.bind(console, 1)); var s2 = observeMap.subscribe(console.log.bind(console, 2)); 请指教,谢谢!

任何人都可以发布一个使用node.js查询postgresql数据库的RXJS示例?

任何人都可以发布一个使用node.js查询postgresql数据库的RXJS示例? 谷歌似乎没有任何关于此… 问候 肖恩。

RxJs如何观察对象属性的变化

我遇到了一个问题。 我有一个客户端,实例化具有布尔属性。 我想等到它切换到真,然后解决承诺。 我尝试了很多不同的方法,包括while循环,只是完全阻塞线程而不检查更新。 这是尝试 var startClient = function() { return new Promise((resolve, reject)=> { var client = CreateClient(); while(!client.ready) {} resolve(client); }); }; 我的问题是:有没有办法使用RxJs帮助..也许发射和事件,如果就绪属性更改? 下面是我想要做的更伪代码。 var startClient = function() { return new Promise((resolve, reject)=> { var client = CreateClient(); var emitter = Observable.watch(client, 'ready'); emitter.on('ready', function(result) { if(result) resolve(client); }); }; 有什么build议么? 谢谢!

如何创build一个RxJS缓冲区,将NodeJS中的元素进行分组,但不依赖于永远运行的时间间隔?

我在Rx.Observable.fromEvent中使用Rx.Observable.fromEvent捕获应用程序中的事件。 这些被发送到另一个服务器使用请求( https://www.npmjs.com/package/request )。 为了避免高networking负载,我需要在发送请求之间的给定超时时间内caching这些事件。 问题 使用bufferWithTime(200)将使节点进程保持运行,我不知道应用程序何时完成closuresstream。 有什么办法可以使用Rx缓冲区来说: 当元素1被按下时设定一个计时器 当元素2和3在定时器到期之前到达时,将它们推到数组[1,2,3](缓冲区) 计时器到期时,向pipe道发送[1,2,3]arrays。 如果元素4在计时器到期之后到来,那么设置一个新的计时器并重新开始。 如果没有元件被推动,则不启动将使过程退出的计时器。 我最初的做法是: Rx.Observable .fromEvent(eventEmitter, 'log') .bufferWithTime(200) // this is the issue .map(addEventsToRequestOption) .map(request) .flatMap(Promise.resolve) .subscribe(log('Response received'))

Reactive-Extensions / RxJS实现到node.js

我只是想实现 https://github.com/Reactive-Extensions/RxJS 到我的节点项目。 当然,有npm包可用,但我看到它更less,更less的模块,只使用最小。 文件,所以我想使用git来源的rxjs。 我下载了RxJS-master并将Dir下的所有文件复制到../myProject/lib/rx/ 我懂了 这些文件之间的rx.node.js var Rx = require('./rx'); require('./rx.aggregates'); require('./rx.binding'); require('./rx.coincidence'); require('./rx.experimental'); require('./rx.joinpatterns'); require('./rx.testing'); require('./rx.time'); module.exports = Rx; 所以, 我的app.js代码是这样的 var rx = require("./lib/rx/rx.node.js") function test() { var as = new rx.AsyncSubject() setTimeout(function () { as.onNext("works!") as.onCompleted() }, 500) return as } var a = test().subscribe(function (result) { console.log("Got result: " […]

RxJs避免外部状态,但仍然访问以前的值

我正在使用RxJs来听一个amqp queu(不是真正相关的)。 我有一个函数createConnection返回一个Observable发射新的连接对象。 一旦我有连接,我希望每1000毫秒通过它发送消息,10个消息后,我想closures连接。 我试图避免外部状态,但如果我不把连接存储在一个外部variables,我怎么能closures它? 看到我从连接开始,然后flatMap和推消息,所以几个链后,我不再有连接对象。 这不是我的stream量,但想象这样的事情: createConnection() .flatMap(connection => connection.createChannel()) .flatMap(channel => channel.send(message)) .do(console.log) .subscribe(connection => connection.close()) <— obviously connection isn't here 现在我明白这样做很愚蠢,但是现在如何访问连接呢? 我当然可以从var connection = createConnection() 后来以某种方式join。 但是,我该怎么做呢? 我甚至不知道如何正确地提出这个问题。 底线,我有一个可观察的,发出连接,打开连接后,我想要一个观察,每1000毫秒发出一个消息(带一个take(10) ),然后closures连接