我如何使用事件和承诺来控制程序stream?
我有一个这样的class级:
import net from 'net'; import {EventEmitter} from 'events'; import Promise from 'bluebird'; class MyClass extends EventEmitter { constructor(host = 'localhost', port = 10011) { super(EventEmitter); this.host = host; this.port = port; this.socket = null; this.connect(); } connect() { this.socket = net.connect(this.port, this.host); this.socket.on('connect', this.handle.bind(this)); } handle(data) { this.socket.on('data', data => { }); } send(data) { this.socket.write(data); } }
我将如何将send
方法变成一个承诺,从套接字的data
事件返回一个值? 当数据发送给服务器时,服务器只发回数据,而不是一个容易被抑制的连接消息。
我试过类似的东西:
handle(data) { this.socket.on('data', data => { return this.socket.resolve(data); }); this.socket.on('error', this.socket.reject.bind(this)); } send(data) { return new Promise((resolve, reject) => { this.socket.resolve = resolve; this.socket.reject = reject; this.socket.write(data); }); }
显然,这是行不通的,因为当链接和/或调用多次并行send
时, resolve
/ reject
会相互覆盖。
还有两个并行调用send
的问题,它解决了哪个响应首先返回。
我目前有一个实现使用队列和推迟,但它感觉混乱,因为队列不断被检查。
我希望能够做到以下几点:
let c = new MyClass('localhost', 10011); c.send('foo').then(response => { return c.send('bar', response.param); //`response` should be the data returned from `this.socket.on('data')`. }).then(response => { console.log(response); }).catch(error => console.log(error));
只是添加,我没有任何控制收到的数据,这意味着它不能被修改的stream之外。
编辑 :所以这似乎是不可能的,由于TCP没有请求 – 响应stream程。 这怎么可以仍然使用承诺来实现,但是要么使用单个执行(一次一个请求)承诺链或队列。
我把问题解决到最低限度,并使浏览器可运行:
- 套接字类被嘲笑。
- 从
EventEmitter
删除有关端口,主机和inheritance的EventEmitter
。
该解决scheme通过将新请求附加到承诺链,但在任何给定时间点允许最多一个开放/未答复的请求。 .send
每次调用时.send
返回一个新的promise,并且该类负责所有的内部同步。 所以.send
可以被多次调用,并且保证了请求处理的正确顺序(FIFO)。 我添加的一个附加function是修剪承诺链,如果没有挂起的请求。
警告我完全省略了error handling,但是无论如何都要根据你的特定用例量身定做。
DEMO
class SocketMock { constructor(){ this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); this.listeners = { // 'error' : [], 'data' : [] } } send(data){ console.log(`SENDING DATA: ${data}`); var response = `SERVER RESPONSE TO: ${data}`; setTimeout( () => this.listeners['data'].forEach(cb => cb(response)), Math.random()*2000 + 250); } on(event, callback){ this.listeners[event].push(callback); } } class SingleRequestCoordinator { constructor() { this._openRequests = 0; this.socket = new SocketMock(); this._promiseChain = this.socket .connected.then( () => console.log('SOCKET CONNECTED')); this.socket.on('data', (data) => { this._openRequests -= 1; console.log(this._openRequests); if(this._openRequests === 0){ console.log('NO PENDING REQUEST --- trimming the chain'); this._promiseChain = this.socket.connected } this._deferred.resolve(data); }); } send(data) { this._openRequests += 1; this._promiseChain = this._promiseChain .then(() => { this._deferred = Promise.defer(); this.socket.send(data); return this._deferred.promise; }); return this._promiseChain; } } var sender = new SingleRequestCoordinator(); sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); setTimeout(() => sender.send('data-4') .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
如果您的send()
调用相互混淆,则应将其保存到caching中。 为了确保收到的消息与发送的消息相匹配,您应该为每个消息分配一些唯一的id
到有效负载中。
所以你的消息发送者将看起来像这样
class MyClass extends EventEmitter { constructor() { // [redacted] this.messages = new Map(); } handle(data) { this.socket.on('data', data => { this.messages.get(data.id)(data); this.messages.delete(data.id); }); } send(data) { return return new Promise((resolve, reject) => { this.messages.set(data.id, resolve); this.socket.write(data); }); } }
这个代码对于消息顺序是不明智的,你会得到你想要的API。
socket.write(data[, encoding][, callback])
接受一个callback。 您可以拒绝或解决此callback。
class MyClass extends EventEmitter { constructor(host = 'localhost', port = 10011) { super(EventEmitter); this.host = host; this.port = port; this.socket = null; this.requests = null; this.connect(); } connect() { this.socket = net.connect(this.port, this.host); this.socket.on('connect', () => { this.requests = []; this.socket.on('data', this.handle.bind(this)); this.socket.on('error', this.error.bind(this)); }); } handle(data) { var [request, resolve, reject] = this.requests.pop(); // I'm not sure what will happen with the destructuring if requests is empty if(resolve) { resolve(data); } } error(error) { var [request, resolve, reject] = this.requests.pop(); if(reject) { reject(error); } } send(data) { return new Promise((resolve, reject) => { if(this.requests === null) { return reject('Not connected'); } this.requests.push([data, resolve, reject]); this.socket.write(data); }); } }
没有testing,因此不知道方法签名,但这是基本的想法。
这假设每个请求会有一个handle
或error
事件。
我考虑的越多,在您的应用程序数据中没有额外的信息似乎也是不可能的,例如数据包号码匹配请求的响应。
现在它的实现方式(以及它在你的问题中的方式),甚至不能确定一个答案只能匹配一个handle
事件。