Tag: rxjs

Observable的简单testing结果与柴和摩卡nodejs

我正在开发一个应用程序使用Nodejs,RxJS和Typescript。 该应用程序有一个返回一个string的Observable的函数 myObsFunction() : Observable<string> { … do stuff } 我想能够做一个简单的testing来检查,当我订阅这个函数,我得到预期的string。 我使用柴和摩卡 ,所以我写下面的testing用例 import { expect } from 'chai'; import 'mocha'; import {myObsFunction} from './my-source-file'; describe('myObsFunction function', () => { it('check myObsFunction', () => { const expectedString = 'abc'; let receivedString: string; myObsFunction().subscribe( data => receivedString = data, error => console.error(error), () => expect(receivedString).to.equal(expectedString) ) […]

RxJS node-oauth简单的获取请求失败

处理一个简单的(相对)RxJS演示/testing代码,这个演示/testing代码将Twitter的一个API调用和一个MongoDB请求压缩成一个单独的observable。 为了能够做出一个Twitter的Api调用,你需要通过Oauth进行身份validation。 我find了node-oauth库来validation节点请求: var oauth = new OAuth.OAuth( 'https://api.twitter.com/oauth/request_token', 'https://api.twitter.com/oauth/access_token', 'key', 'secret', '1.0A', null, 'HMAC-SHA1' ); 然后我用Oauth get创build一个新的Observable Oauth get方法从Twitter获取地点: var twitterStream = Rx.Observable.fromNodeCallback(oauth.get)('https://api.twitter.com/1.1/trends/place.json?id=23424977', 'access-token', 'token'); 来自MongoDB的另一个stream: Rx.Node.fromStream(Order.find().stream()); 最后一步压缩他们: var getNewList = Rx.Observable .zip(flickrStream, orderStream, function(user, order) { return {user: user, order: order}; }); getNewList.subscribe(function(response) { // render `response` console.log(response); }); 尽pipe代码似乎对我来说,当我通过节点执行它时,下面的消息被打印到控制台: TypeError: Object #<Object> […]

从两个来源rx.js追赶订阅

我需要结合追赶和订阅新的饲料。 所以,首先我要查询数据库中所有我错过的新logging,然后切换到所有正在进入的新logging的pub sub。 第一部分很容易做你的查询,也许在500批次,这将给你一个数组,你可以rx.observeFrom。 第二部分很简单,你只需要在pubsub上放一个rx.observe。 但我需要按顺序进行,所以在我开始播放新歌曲之前,我需要播放所有的旧唱片。 我想我可以开始订阅pubsub,把这些放在一个数组中,然后开始处理旧的数据,当我完成后,删除dups(或者因为我做了dup检查)允许less数dups,但是播放积累的logging,直到他们走了,然后一个一个出来。 我的问题是做这件事的最好方法是什么? 我应该创build一个订阅,开始build立一个数组中的新logging,然后开始处理旧的,然后在oldrecord过程的“然后”订阅另一个数组? 好吧,这是我迄今为止。 我需要build立testing,并完成一些psudo代码,以确定它是否工作,更不用说是一个很好的实现。 在我埋葬自己之前,随时可以阻止我。 var catchUpSubscription = function catchUpSubscription(startFrom) { EventEmitter.call(this); var subscription = this.getCurrentEventsSubscription(); // calling map to start subscription and catch in an array. // not sure if this is right var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x); // getPastEvents gets batches of 500 iterates over […]

使用RxJs遍历node.js中的目录树

我正在尝试使用RxJs和node.js来遍历目录树。 我想出了工作解决scheme: const filesInDir = Rx.Observable.fromNodeCallback(fs.readdir) const statFile = Rx.Observable.fromNodeCallback(fs.stat) const listFiles = (prefix, dir = '') => { const file$ = filesInDir(`${prefix}/${dir}`) .flatMap(file => file) .filter(file => !file.startsWith('.')) const isDir$ = file$ .map(file => statFile(`${prefix}/${dir}/${file}`)) .flatMap(file => file) .map(file => file.isDirectory()) return file$ .zip(isDir$, (file, isDir) => {return {file, isDir}}) .map(f => { if […]

为什么这个简单的rxjs例子不像预期的那样运行

我想更深入地理解反应式编程的基础知识,所以我在这里通过示例: http : //reactivex.io/rxjs/manual/tutorial.html#creating-observables 我只是在文本编辑器中input它们并通过节点在terminal中运行它们。 第一个例子没有产生任何输出(第二个工作,所以这不是加载库的问题)。 我期待它输出“富”,但我什么都没有得到。 为什么? var myObservable = Rx.Subject.create(); myObservable.subscribe(value => console.log(value)); myObservable.next('foo');

RxJS节点等待观察完成,然后按顺序订阅下一个

我试图填充我的数据库从谷歌表格中刮取的值。 hasuraHelper.insert()向我的数据库发送一个HTTP邮件。 我怎样才能使一个insert()只在前面的api调用返回之后被调用? 我认为concatMap可以做到这一点,但似乎仍然热切地赞同所有的排放。 当前:请求1 – >请求2 – > … – >请求1完成 – >请求2完成 – > … 我想要的:请求1 – >请求1完成 – >请求2 – >请求2完成 – > … 这是相关的代码: sheetsHelper.authToken() .flatMap(sheetsHelper.get) .flatMap(response => response.values) //values is a 2d array .map(row => { console.log(row[0]); const flights = []; //add 100-500 objects to this array return flights; }) […]

rxjs,节点,处理程序中的订阅内存泄漏

假设你有一个像这样的express样式处理函数: (req, res, next) => { const requestedProgramIDs = req.body.programIDs; // assume all data is in a cache observable cache$.filter(data => requestedProgramIDs.indexOf(data.id) >= 0) .toArray() .subscribe(programs => res.end(JSON.stringify(programs))) }; 我的问题是这个订阅会泄漏内存吗? 我担心,如果我只是unsubscribefunction的结束,那么不能保证订阅代码将执行。 我的解决scheme思路如下: 使用.take(1)确保取消订阅 如果observable的scheduler是同步的,那么取消订阅将是同步的,我可以在函数的末尾加上它。 问题: 我是否认为订阅会泄漏? 如果,那么你会如何build议我处理(防止)泄漏?

rxjs在错误之后立即完成观察,而不是继续

我试图通过一个observable来传输一组内容,并在第一个错误之后停止。 把它看作一系列的项目是简单的,因为它的行为是一样的。 我正在创build一个项目的数组观察 将每个项目映射到一个URL 将URL称为请求承诺 执行一个catch(),在发生错误时返回一个observable.empty() 使用RxJS 5: rx.Observable.from(array) .map(self.createUrl) .flatMap(x => { var options = { uri: url, headers: { "Content-Type": "application/json" }; return rx.Observable.fromPromise(request-promise(options)); }) .catch(() => { return rx.Observable.empty();}) .subscribe( x => console.log('success:', x), e => console.log('error'), () => console.log('complete')); 在执行此序列时,遇到第一个错误后代码将停止。 我怀疑在#4中的空观察是终止观察,但我不知道为什么。 我期望的过程是无论错误如何处理数组中的所有项目 – 最终处理所有成功的项目,并在每个错误后恢复。

听stream言承诺

我一直在阅读有关Observables 。 有一件事我读到,就是它能够像stream一样听。 我试着用下面的代码。 const Rx = require('rxjs/Rx'); Promise = require('bluebird'); let i = 0; function calculate() { return new Promise((resolve, reject) => { setInterval(() => { console.log(++i); resolve(i); }, 5000); }) } let source = Rx.Observable.fromPromise(calculate()); source.subscribe( next => { console.log(next, ' next '); }, err => { console.error(err) }, () => { console.log('done!'); […]

RxJS:将Node.js套接字转换为Observable并将它们合并成一个stream

我试图将Node的套接字转换为使用RxJS的stream。 目标是让每个套接字创build它自己的stream,并将所有stream合并成一个。 当新的套接字连接时,将使用socketStream = Rx.Observable.fromEvent(socket, 'message')创build一个stream。 然后这个stream被合并到一个类似的主stream中 mainStream = mainStream.merge(socketStream) 这似乎工作正常,问题是,200-250客户端连接后,服务器引发RangeError: Maximum call stack size exceeded 。 我有示例服务器和客户端代码,在这里主要演示此行为: 示例服务器和客户端 我怀疑,随着客户端连接/断开连接,主stream不能正确清理。