节点本地mongodb驱动程序连接池问题

我使用Node.js + RxJS + MongoDB作为socket.io服务器。 经过一定数量的请求后,我的连接池到DB变得非常大。 所以文件描述符永远不会被释放,服务器closures。

对于db查询我使用下面的代码:

/* @flow */ var { Observable } = require('rx'); var client = require('mongodb'); var { assign } = require('lodash'); var __DEV__ = process.env.NODE_ENV !== 'production'; var URL = 'my database url'; class QueryBuilder { _db$: Observable; _selectors: Object; constructor(db$: Observable, selectors?: Object) { this._db$ = db$; if (!selectors) { this._selectors = assign({ collection: null, query: {}, opts: {}, sort: {}, offset: 0, limit: 0 }, selectors); } else { this._selectors = selectors; } } static connect(url: string): QueryBuilder { var connect = Observable.fromNodeCallback(client.connect, client); var db$ = connect(db.url); db$.subscribe( _ => { if (__DEV__) { console.log('Connected to database on', url); } }, err => console.log('Database connection error:', err.message, err.stack) ); return new QueryBuilder(db$); } close(): any { this._db$.dispose(); return QueryBuilder; } collection(name: string): QueryBuilder { var ss = assign({}, this._selectors, { collection: name }); return new QueryBuilder(this._db$, ss); } select(query?: Object = {}, opts?: Object = {}): QueryBuilder { var ss = assign({}, this._selectors, { query, opts }); return new QueryBuilder(this._db$, ss); } selectOne(query?: Object = {}, opts?: Object = {}): QueryBuilder { var ss = assign({}, this._selectors, { query: query, opts: opts, limit: 1 }); return new QueryBuilder(this._db$, ss); } sort(sort: Object): QueryBuilder { var ss = assign({}, this._selectors, { sort }); return new QueryBuilder(this._db$, ss); } skip(offset: number): QueryBuilder { var ss = assign({}, this._selectors, { offset }); return new QueryBuilder(this._db$, ss); } limit(limit: number = 0): QueryBuilder { var ss = assign({}, this._selectors, { limit }); return new QueryBuilder(this._db$, ss); } exec(): Observable { var ss = this._selectors; var db$ = this._db$ if (!ss.collection) return Observable.throw('You have to provide collection name.'); if (!db$) return Observable.throw('No db connection found.'); var o = db$.flatMapLatest(db => { var c = db.collection(ss.collection); var cursor = c.find(ss.query, ss.opts).sort(ss.sort).skip(ss.offset).limit(ss.limit); var obs = Observable.fromNodeCallback(cursor.toArray, cursor); return obs(); }); return ss.limit === 1 ? o.map(res => res[0]) : o; } } module.exports = QueryBuilder.connect(URL); 

这里的问题在哪里?

您正在泄漏connect方法中的订阅,并且正在处置observable而不是订阅。

 class QueryBuilder { _db$: Observable; _sub: Disposable; _selectors: Object; constructor(db$: Observable, selectors?: Object) { this._db$ = db$; this._sub = db$.subscribe( _ => { if (__DEV__) { console.log('Connected to database on', url); } }, err => console.log('Database connection error:', err.message, err.stack) ); if (!selectors) { this._selectors = assign({ collection: null, query: {}, opts: {}, sort: {}, offset: 0, limit: 0 }, selectors); } else { this._selectors = selectors; } } static connect(url: string): QueryBuilder { var connect = Observable.fromNodeCallback(client.connect, client); var db$ = connect(db.url); return new QueryBuilder(db$); } close(): any { this._sub.dispose(); return QueryBuilder; } } 

(未testing)