我如何使用事件和承诺来控制程序stream?

我有一个这样的class级:

import net from 'net'; import {EventEmitter} from 'events'; import Promise from 'bluebird'; class MyClass extends EventEmitter { constructor(host = 'localhost', port = 10011) { super(EventEmitter); this.host = host; this.port = port; this.socket = null; this.connect(); } connect() { this.socket = net.connect(this.port, this.host); this.socket.on('connect', this.handle.bind(this)); } handle(data) { this.socket.on('data', data => { }); } send(data) { this.socket.write(data); } } 

我将如何将send方法变成一个承诺,从套接字的data事件返回一个值? 当数据发送给服务器时,服务器只发回数据,而不是一个容易被抑制的连接消息。

我试过类似的东西:

 handle(data) { this.socket.on('data', data => { return this.socket.resolve(data); }); this.socket.on('error', this.socket.reject.bind(this)); } send(data) { return new Promise((resolve, reject) => { this.socket.resolve = resolve; this.socket.reject = reject; this.socket.write(data); }); } 

显然,这是行不通的,因为当链接和/或调用多次并行send时, resolve / reject会相互覆盖。

还有两个并行调用send的问题,它解决了哪个响应首先返回。

我目前有一个实现使用队列和推迟,但它感觉混乱,因为队列不断被检查。

我希望能够做到以下几点:

 let c = new MyClass('localhost', 10011); c.send('foo').then(response => { return c.send('bar', response.param); //`response` should be the data returned from `this.socket.on('data')`. }).then(response => { console.log(response); }).catch(error => console.log(error)); 

只是添加,我没有任何控制收到的数据,这意味着它不能被修改的stream之外。

编辑 :所以这似乎是不可能的,由于TCP没有请求 – 响应stream程。 这怎么可以仍然使用承诺来实现,但是要么使用单个执行(一次一个请求)承诺链或队列。

我把问题解决到最低限度,并使浏览器可运行:

  1. 套接字类被嘲笑。
  2. EventEmitter删除有关端口,主机和inheritance的EventEmitter

该解决scheme通过将新请求附加到承诺链,但在任何给定时间点允许最多一个开放/未答复的请求。 .send每次调用时.send返回一个新的promise,并且该类负责所有的内部同步。 所以.send可以被多次调用,并且保证了请求处理的正确顺序(FIFO)。 我添加的一个附加function是修剪承诺链,如果没有挂起的请求。


警告我完全省略了error handling,但是无论如何都要根据你的特定用例量身定做。


DEMO

 class SocketMock { constructor(){ this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); this.listeners = { // 'error' : [], 'data' : [] } } send(data){ console.log(`SENDING DATA: ${data}`); var response = `SERVER RESPONSE TO: ${data}`; setTimeout( () => this.listeners['data'].forEach(cb => cb(response)), Math.random()*2000 + 250); } on(event, callback){ this.listeners[event].push(callback); } } class SingleRequestCoordinator { constructor() { this._openRequests = 0; this.socket = new SocketMock(); this._promiseChain = this.socket .connected.then( () => console.log('SOCKET CONNECTED')); this.socket.on('data', (data) => { this._openRequests -= 1; console.log(this._openRequests); if(this._openRequests === 0){ console.log('NO PENDING REQUEST --- trimming the chain'); this._promiseChain = this.socket.connected } this._deferred.resolve(data); }); } send(data) { this._openRequests += 1; this._promiseChain = this._promiseChain .then(() => { this._deferred = Promise.defer(); this.socket.send(data); return this._deferred.promise; }); return this._promiseChain; } } var sender = new SingleRequestCoordinator(); sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); setTimeout(() => sender.send('data-4') .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000); 

如果您的send()调用相互混淆,则应将其保存到caching中。 为了确保收到的消息与发送的消息相匹配,您应该为每个消息分配一些唯一的id到有效负载中。

所以你的消息发送者将看起来像这样

 class MyClass extends EventEmitter { constructor() { // [redacted] this.messages = new Map(); } handle(data) { this.socket.on('data', data => { this.messages.get(data.id)(data); this.messages.delete(data.id); }); } send(data) { return return new Promise((resolve, reject) => { this.messages.set(data.id, resolve); this.socket.write(data); }); } } 

这个代码对于消息顺序是不明智的,你会得到你想要的API。

socket.write(data[, encoding][, callback])接受一个callback。 您可以拒绝或解决此callback。

 class MyClass extends EventEmitter { constructor(host = 'localhost', port = 10011) { super(EventEmitter); this.host = host; this.port = port; this.socket = null; this.requests = null; this.connect(); } connect() { this.socket = net.connect(this.port, this.host); this.socket.on('connect', () => { this.requests = []; this.socket.on('data', this.handle.bind(this)); this.socket.on('error', this.error.bind(this)); }); } handle(data) { var [request, resolve, reject] = this.requests.pop(); // I'm not sure what will happen with the destructuring if requests is empty if(resolve) { resolve(data); } } error(error) { var [request, resolve, reject] = this.requests.pop(); if(reject) { reject(error); } } send(data) { return new Promise((resolve, reject) => { if(this.requests === null) { return reject('Not connected'); } this.requests.push([data, resolve, reject]); this.socket.write(data); }); } } 

没有testing,因此不知道方法签名,但这是基本的想法。

这假设每个请求会有一个handleerror事件。

我考虑的越多,在您的应用程序数据中没有额外的信息似乎也是不可能的,例如数据包号码匹配请求的响应。

现在它的实现方式(以及它在你的问题中的方式),甚至不能确定一个答案只能匹配一个handle事件。