Spaces:
Paused
Paused
| ; | |
| const process = require('process'); | |
| const Pool = require('./pool.js'); | |
| const PoolConfig = require('./pool_config.js'); | |
| const Connection = require('./connection.js'); | |
| const EventEmitter = require('events').EventEmitter; | |
| /** | |
| * Selector | |
| */ | |
| const makeSelector = { | |
| RR() { | |
| let index = 0; | |
| return clusterIds => clusterIds[index++ % clusterIds.length]; | |
| }, | |
| RANDOM() { | |
| return clusterIds => | |
| clusterIds[Math.floor(Math.random() * clusterIds.length)]; | |
| }, | |
| ORDER() { | |
| return clusterIds => clusterIds[0]; | |
| } | |
| }; | |
| class PoolNamespace { | |
| constructor(cluster, pattern, selector) { | |
| this._cluster = cluster; | |
| this._pattern = pattern; | |
| this._selector = makeSelector[selector](); | |
| } | |
| getConnection(cb) { | |
| const clusterNode = this._getClusterNode(); | |
| if (clusterNode === null) { | |
| return cb(new Error('Pool does Not exists.')); | |
| } | |
| return this._cluster._getConnection(clusterNode, (err, connection) => { | |
| if (err) { | |
| return cb(err); | |
| } | |
| if (connection === 'retry') { | |
| return this.getConnection(cb); | |
| } | |
| return cb(null, connection); | |
| }); | |
| } | |
| /** | |
| * pool cluster query | |
| * @param {*} sql | |
| * @param {*} values | |
| * @param {*} cb | |
| * @returns query | |
| */ | |
| query(sql, values, cb) { | |
| const query = Connection.createQuery(sql, values, cb, {}); | |
| this.getConnection((err, conn) => { | |
| if (err) { | |
| if (typeof query.onResult === 'function') { | |
| query.onResult(err); | |
| } else { | |
| query.emit('error', err); | |
| } | |
| return; | |
| } | |
| try { | |
| conn.query(query).once('end', () => { | |
| conn.release(); | |
| }); | |
| } catch (e) { | |
| conn.release(); | |
| throw e; | |
| } | |
| }); | |
| return query; | |
| } | |
| /** | |
| * pool cluster execute | |
| * @param {*} sql | |
| * @param {*} values | |
| * @param {*} cb | |
| */ | |
| execute(sql, values, cb) { | |
| if (typeof values === 'function') { | |
| cb = values; | |
| values = []; | |
| } | |
| this.getConnection((err, conn) => { | |
| if (err) { | |
| return cb(err); | |
| } | |
| try { | |
| conn.execute(sql, values, cb).once('end', () => { | |
| conn.release(); | |
| }); | |
| } catch (e) { | |
| conn.release(); | |
| throw e; | |
| } | |
| }); | |
| } | |
| _getClusterNode() { | |
| const foundNodeIds = this._cluster._findNodeIds(this._pattern); | |
| if (foundNodeIds.length === 0) { | |
| return null; | |
| } | |
| const nodeId = | |
| foundNodeIds.length === 1 | |
| ? foundNodeIds[0] | |
| : this._selector(foundNodeIds); | |
| return this._cluster._getNode(nodeId); | |
| } | |
| } | |
| class PoolCluster extends EventEmitter { | |
| constructor(config) { | |
| super(); | |
| config = config || {}; | |
| this._canRetry = | |
| typeof config.canRetry === 'undefined' ? true : config.canRetry; | |
| this._removeNodeErrorCount = config.removeNodeErrorCount || 5; | |
| this._defaultSelector = config.defaultSelector || 'RR'; | |
| this._closed = false; | |
| this._lastId = 0; | |
| this._nodes = {}; | |
| this._serviceableNodeIds = []; | |
| this._namespaces = {}; | |
| this._findCaches = {}; | |
| } | |
| of(pattern, selector) { | |
| pattern = pattern || '*'; | |
| selector = selector || this._defaultSelector; | |
| selector = selector.toUpperCase(); | |
| if (!makeSelector[selector] === 'undefined') { | |
| selector = this._defaultSelector; | |
| } | |
| const key = pattern + selector; | |
| if (typeof this._namespaces[key] === 'undefined') { | |
| this._namespaces[key] = new PoolNamespace(this, pattern, selector); | |
| } | |
| return this._namespaces[key]; | |
| } | |
| add(id, config) { | |
| if (typeof id === 'object') { | |
| config = id; | |
| id = `CLUSTER::${++this._lastId}`; | |
| } | |
| if (typeof this._nodes[id] === 'undefined') { | |
| this._nodes[id] = { | |
| id: id, | |
| errorCount: 0, | |
| pool: new Pool({ config: new PoolConfig(config) }) | |
| }; | |
| this._serviceableNodeIds.push(id); | |
| this._clearFindCaches(); | |
| } | |
| } | |
| getConnection(pattern, selector, cb) { | |
| let namespace; | |
| if (typeof pattern === 'function') { | |
| cb = pattern; | |
| namespace = this.of(); | |
| } else { | |
| if (typeof selector === 'function') { | |
| cb = selector; | |
| selector = this._defaultSelector; | |
| } | |
| namespace = this.of(pattern, selector); | |
| } | |
| namespace.getConnection(cb); | |
| } | |
| end(callback) { | |
| const cb = | |
| callback !== undefined | |
| ? callback | |
| : err => { | |
| if (err) { | |
| throw err; | |
| } | |
| }; | |
| if (this._closed) { | |
| process.nextTick(cb); | |
| return; | |
| } | |
| this._closed = true; | |
| let calledBack = false; | |
| let waitingClose = 0; | |
| const onEnd = err => { | |
| if (!calledBack && (err || --waitingClose <= 0)) { | |
| calledBack = true; | |
| return cb(err); | |
| } | |
| }; | |
| for (const id in this._nodes) { | |
| waitingClose++; | |
| this._nodes[id].pool.end(onEnd); | |
| } | |
| if (waitingClose === 0) { | |
| process.nextTick(onEnd); | |
| } | |
| } | |
| _findNodeIds(pattern) { | |
| if (typeof this._findCaches[pattern] !== 'undefined') { | |
| return this._findCaches[pattern]; | |
| } | |
| let foundNodeIds; | |
| if (pattern === '*') { | |
| // all | |
| foundNodeIds = this._serviceableNodeIds; | |
| } else if (this._serviceableNodeIds.indexOf(pattern) !== -1) { | |
| // one | |
| foundNodeIds = [pattern]; | |
| } else { | |
| // wild matching | |
| const keyword = pattern.substring(pattern.length - 1, 0); | |
| foundNodeIds = this._serviceableNodeIds.filter(id => | |
| id.startsWith(keyword) | |
| ); | |
| } | |
| this._findCaches[pattern] = foundNodeIds; | |
| return foundNodeIds; | |
| } | |
| _getNode(id) { | |
| return this._nodes[id] || null; | |
| } | |
| _increaseErrorCount(node) { | |
| if (++node.errorCount >= this._removeNodeErrorCount) { | |
| const index = this._serviceableNodeIds.indexOf(node.id); | |
| if (index !== -1) { | |
| this._serviceableNodeIds.splice(index, 1); | |
| delete this._nodes[node.id]; | |
| this._clearFindCaches(); | |
| node.pool.end(); | |
| this.emit('remove', node.id); | |
| } | |
| } | |
| } | |
| _decreaseErrorCount(node) { | |
| if (node.errorCount > 0) { | |
| --node.errorCount; | |
| } | |
| } | |
| _getConnection(node, cb) { | |
| node.pool.getConnection((err, connection) => { | |
| if (err) { | |
| this._increaseErrorCount(node); | |
| if (this._canRetry) { | |
| // REVIEW: this seems wrong? | |
| this.emit('warn', err); | |
| // eslint-disable-next-line no-console | |
| console.warn(`[Error] PoolCluster : ${err}`); | |
| return cb(null, 'retry'); | |
| } | |
| return cb(err); | |
| } | |
| this._decreaseErrorCount(node); | |
| connection._clusterId = node.id; | |
| return cb(null, connection); | |
| }); | |
| } | |
| _clearFindCaches() { | |
| this._findCaches = {}; | |
| } | |
| } | |
| module.exports = PoolCluster; | |