RxJs避免外部状态,但仍然访问以前的值

我正在使用RxJs来听一个amqp queu(不是真正相关的)。

我有一个函数createConnection返回一个Observable发射新的连接对象。 一旦我有连接,我希望每1000毫秒通过它发送消息,10个消息后,我想closures连接。

我试图避免外部状态,但如果我不把连接存储在一个外部variables,我怎么能closures它? 看到我从连接开始,然后flatMap和推消息,所以几个链后,我不再有连接对象。

这不是我的stream量,但想象这样的事情:

 createConnection() .flatMap(connection => connection.createChannel()) .flatMap(channel => channel.send(message)) .do(console.log) .subscribe(connection => connection.close()) <--- obviously connection isn't here 

现在我明白这样做很愚蠢,但是现在如何访问连接呢? 我当然可以从var connection = createConnection()

后来以某种方式join。 但是,我该怎么做呢? 我甚至不知道如何正确地提出这个问题。 底线,我有一个可观察的,发出连接,打开连接后,我想要一个观察,每1000毫秒发出一个消息(带一个take(10) ),然后closures连接

你的问题的直接答案是“你可以通过每一步”。 例如,您可以replace这一行

 .flatMap(connection => connection.createChannel()) 

与这一个:

 .flatMap(connection => ({ connection: connection, channel: connection.createChannel() })) 

并保留对连接的访问​​权限。

但还有另一种方法可以做你想做的事情。 假设你的createConnection和createChannel函数如下所示:

 function createConnection() { return Rx.Observable.create(observer => { console.log('creating connection'); const connection = { createChannel: () => createChannel(), close: () => console.log('disposing connection') }; observer.onNext(connection); return Rx.Disposable.create(() => connection.close()); }); } function createChannel() { return Rx.Observable.create(observer => { const channel = { send: x => console.log('sending message: ' + x) }; observer.onNext(channel); // assuming no cleanup here, don't need to return disposable }); } 

createConnection (和createChannel ,但我们将专注于前者)返回一个冷观察; 每个用户将得到自己的连接stream,包含一个单一的连接,当这个订阅到期,处理逻辑将被自动调用。

这可以让你做这样的事情:

 const subscription = createConnection() .flatMap(connection => connection.createChannel()) .flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i }))) .take(10) .subscribe(x => x.channel.send(x.data)) ; 

您实际上并不需要处理订阅以进行清理; take(10)满意后,整个链条将完成并引发清理。 您需要明确地处理订阅的唯一原因是如果您希望在10个1000毫秒的时间间隔之前拆分它,

请注意,这个解决scheme还包含了一个直接回答你的问题的实例:我们把这个通道放在这个线上,所以我们可以在传递给订阅调用的onNext lambda中使用它(通常在这样的代码出现的地方)。

以下是整个工作: https : //jsbin.com/korihe/3/edit?js,console,output

这个代码给了我一个错误,因为flatmap等待一个可观察的<(T)>({connection:connection,channel:connection.createChannel()} )它是一个对象。

.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))

相反,你可以使用combineLatest操作符

.flatMap(connection => Observable.combineLatest( Observable.of(connection), connection.createChannel(), (connection, channel) => { ... code .... });