Tag: rxjs5

RxJS去嵌套callback

我想做一些事情: Rx.Observable.of(userToken) .flatMap(verifyToken) .flatMap(getUserInformation) .flatMap(createUser) .flatMap(signNewToken) .subcribe({ next: result => useResult(result), error: error => handleError(error) }) 我试图避免的是一个callback混乱。 在我的代码中,像verifyToken这样的verifyToken是Observables,我想链接它们。 这种模式是否正确? 因为现在只要其中一个内部做observer.error(new Error('problem')) ,链崩溃,我的error handling程序不会被调用。 我怎样才能改善这个?

RxJS – 等待5个事件发生,然后再继续

我有一个简单的RxJS5观察像这样: function foo(){ return Rx.Observable.create(obs => { obs.next(); }); } 可观察的是这样创build的: const x = foo(); function bar(){ return someObs() .flatMap(() => x.wait(5)) // wait for x to fire 5 times .map(v => ({z:v})); } 我想要做的就是等待X个可观察对象中的5个事件发生,然后再继续。 我不认为这个运营商是我想要的,所以我只是叫运营商“ wait ” 我怎样才能用RxJS5做到这一点?

Observable的简单testing结果与柴和摩卡nodejs

我正在开发一个应用程序使用Nodejs,RxJS和Typescript。 该应用程序有一个返回一个string的Observable的函数 myObsFunction() : Observable<string> { … do stuff } 我想能够做一个简单的testing来检查,当我订阅这个函数,我得到预期的string。 我使用柴和摩卡 ,所以我写下面的testing用例 import { expect } from 'chai'; import 'mocha'; import {myObsFunction} from './my-source-file'; describe('myObsFunction function', () => { it('check myObsFunction', () => { const expectedString = 'abc'; let receivedString: string; myObsFunction().subscribe( data => receivedString = data, error => console.error(error), () => expect(receivedString).to.equal(expectedString) ) […]

无法从连接的Observable提取/解包值

我有这个链应该将10个Observable链接成1个Observable,其中10个Observable中的每一个基本上都应该被解开为一个整数: const Rx = require('rxjs'); var i = 0; const obs = Rx.Observable.interval(10) .map(() => i++) .map(val => Rx.Observable.create(obs => { obs.next(val) })) .take(10) .reduce((prev, curr) => { return prev.concat(curr); // concat all observables }) .last(val => val.flatMap(inner => inner)); // subscribe to Observable obs.subscribe(v => { console.log('\n next (and only) result => \n', v); […]

使用RxJS5设置费率

我有这个代码只读取.csv文件中的数据并将其转换为json并logging数据: const fs = require('fs'); const path = require('path'); const sd = path.resolve(__dirname + '/fixtures/SampleData.csv'); const strm = fs.createReadStream(sd).setEncoding('utf8'); const Rx = require('rxjs/Rx'); const csv2json = require('csv2json'); const dest = strm .pipe(csv2json({ separator: ',' })); dest.on('error', function(e){ console.error(e.stack || e); }) const obs = Rx.Observable.fromEvent(dest, 'data') .flatMap(d => Rx.Observable.timer(100).mapTo(d)) obs.subscribe(v => { console.log(String(v)); }) 代码正在做的是在100毫秒延迟后logging所有的数据。 […]

rxjs在错误之后立即完成观察,而不是继续

我试图通过一个observable来传输一组内容,并在第一个错误之后停止。 把它看作一系列的项目是简单的,因为它的行为是一样的。 我正在创build一个项目的数组观察 将每个项目映射到一个URL 将URL称为请求承诺 执行一个catch(),在发生错误时返回一个observable.empty() 使用RxJS 5: rx.Observable.from(array) .map(self.createUrl) .flatMap(x => { var options = { uri: url, headers: { "Content-Type": "application/json" }; return rx.Observable.fromPromise(request-promise(options)); }) .catch(() => { return rx.Observable.empty();}) .subscribe( x => console.log('success:', x), e => console.log('error'), () => console.log('complete')); 在执行此序列时,遇到第一个错误后代码将停止。 我怀疑在#4中的空观察是终止观察,但我不知道为什么。 我期望的过程是无论错误如何处理数组中的所有项目 – 最终处理所有成功的项目,并在每个错误后恢复。

Promise.resolve()但是Observables(RxJS5)

好了,为了更好地学习RxJS,我决定尝试创build一个自定义的Rx操作符。 所以这是一个简单的工作正常: Rx.Observable.prototype.multiply = function (input) { const source = this; return Rx.Observable.create(function (obs) { return source.subscribe(function(val){ obs.next(input*val); }); }); }; 我们可以像这样使用它: const obs = Rx.Observable.interval(1000) .multiply(4) .forEach(function (v) { console.log(v); }); 然而,如果我们得到一些更复杂的东西,例如,如果我们的操作符采用一个函数而不是一个静态值。 Rx.Observable.prototype.handleFn = function (fn) { const source = this; return Rx.Observable.create(function (obs) { return source.subscribe(function(val){ obs.next(fn.call(obs,val)); }); }); }; 上面的内容都很好, 但是如果我们需要处理从input函数返回的Rx.Observable,如下所示: const […]

节点/angular2:用户login后更新导航栏和显示侧栏

这是我第一个使用Angular 2的项目/应用程序。我目前被困在应用程序的login问题中。 所以,在我的networking应用程序: 期望: 我第一次加载一个login组件。 在这种情况下,导航栏显示“login”button。 正确login后,我可以转到仪表板组件。 在仪表板组件中,我可以在左侧看到侧边栏,我的导航栏更新为显示“注销”button。 当我点击“退出”时,它应该优雅地签约我。 为了更清晰的图片,我已经上传了一张专辑,显示了login和仪表盘的外观以及login后的实际情况。 现实: 我使用Angular 2的localStorage将当前用户存储在关键的currentUser并检测用户是否login。只要在当前用户键中没有值,login组件就可以正常工作。 否则,它会显示一个没有标题的空白仪表板。 login后,侧边栏和更新的导航栏与login屏幕混杂在一起,而仪表板不显示。 我已经做了一个临时的Git仓库 ,现在向你展示我的代码。 如果你注意到,Angular前端连接到一个Node.js后端。 Node.js后端工作得很好。 我认为主要的问题在于我使用Observable(告诉用户login的每个订阅组件),路由上的authentication保护以及authentication服务本身。 为了满足上面的期望,我试图使用Observables并将值发送到订阅组件。 不过,上面提到的“现实”一点也发生了,我还不知道还有什么办法可以解决。 那么,你能帮我看看问题出在哪里,我能做些什么来解决问题? 我将不胜感激您的意见,谢谢。

我怎样才能缓冲一个简单的Rxjs观察?

我想通过一些任意的(但是简单的)标准来缓冲一个观察值。 我在这里设置了一个简单的例子: const observable = Rx.Observable.from([1,2,3]) const filtered = observable.filter((n) => n === 3); observable .buffer(filtered) .subscribe((n) => { // Why is this empty? console.log(n); }); 宾在这里 试图filter做这只会产生一个空的数组。 我期待[1,2,3]的数组,但是这似乎不是如何工作的。 所有缓冲区的文档使用asynchronous事件,如计时器,但这不是我想要的。 我只想根据我自己决定的任意标准来取最后的n个项目。 帮助非常感谢!