Spaces:
Running
Running
; | |
const process = require('process'); | |
const Pool = require('./pool.js'); | |
const PoolConfig = require('./pool_config.js'); | |
const Connection = require('./connection.js'); | |
const EventEmitter = require('events').EventEmitter; | |
/** | |
* Selector | |
*/ | |
const makeSelector = { | |
RR() { | |
let index = 0; | |
return clusterIds => clusterIds[index++ % clusterIds.length]; | |
}, | |
RANDOM() { | |
return clusterIds => | |
clusterIds[Math.floor(Math.random() * clusterIds.length)]; | |
}, | |
ORDER() { | |
return clusterIds => clusterIds[0]; | |
} | |
}; | |
class PoolNamespace { | |
constructor(cluster, pattern, selector) { | |
this._cluster = cluster; | |
this._pattern = pattern; | |
this._selector = makeSelector[selector](); | |
} | |
getConnection(cb) { | |
const clusterNode = this._getClusterNode(); | |
if (clusterNode === null) { | |
return cb(new Error('Pool does Not exists.')); | |
} | |
return this._cluster._getConnection(clusterNode, (err, connection) => { | |
if (err) { | |
return cb(err); | |
} | |
if (connection === 'retry') { | |
return this.getConnection(cb); | |
} | |
return cb(null, connection); | |
}); | |
} | |
/** | |
* pool cluster query | |
* @param {*} sql | |
* @param {*} values | |
* @param {*} cb | |
* @returns query | |
*/ | |
query(sql, values, cb) { | |
const query = Connection.createQuery(sql, values, cb, {}); | |
this.getConnection((err, conn) => { | |
if (err) { | |
if (typeof query.onResult === 'function') { | |
query.onResult(err); | |
} else { | |
query.emit('error', err); | |
} | |
return; | |
} | |
try { | |
conn.query(query).once('end', () => { | |
conn.release(); | |
}); | |
} catch (e) { | |
conn.release(); | |
throw e; | |
} | |
}); | |
return query; | |
} | |
/** | |
* pool cluster execute | |
* @param {*} sql | |
* @param {*} values | |
* @param {*} cb | |
*/ | |
execute(sql, values, cb) { | |
if (typeof values === 'function') { | |
cb = values; | |
values = []; | |
} | |
this.getConnection((err, conn) => { | |
if (err) { | |
return cb(err); | |
} | |
try { | |
conn.execute(sql, values, cb).once('end', () => { | |
conn.release(); | |
}); | |
} catch (e) { | |
conn.release(); | |
throw e; | |
} | |
}); | |
} | |
_getClusterNode() { | |
const foundNodeIds = this._cluster._findNodeIds(this._pattern); | |
if (foundNodeIds.length === 0) { | |
return null; | |
} | |
const nodeId = | |
foundNodeIds.length === 1 | |
? foundNodeIds[0] | |
: this._selector(foundNodeIds); | |
return this._cluster._getNode(nodeId); | |
} | |
} | |
class PoolCluster extends EventEmitter { | |
constructor(config) { | |
super(); | |
config = config || {}; | |
this._canRetry = | |
typeof config.canRetry === 'undefined' ? true : config.canRetry; | |
this._removeNodeErrorCount = config.removeNodeErrorCount || 5; | |
this._defaultSelector = config.defaultSelector || 'RR'; | |
this._closed = false; | |
this._lastId = 0; | |
this._nodes = {}; | |
this._serviceableNodeIds = []; | |
this._namespaces = {}; | |
this._findCaches = {}; | |
} | |
of(pattern, selector) { | |
pattern = pattern || '*'; | |
selector = selector || this._defaultSelector; | |
selector = selector.toUpperCase(); | |
if (!makeSelector[selector] === 'undefined') { | |
selector = this._defaultSelector; | |
} | |
const key = pattern + selector; | |
if (typeof this._namespaces[key] === 'undefined') { | |
this._namespaces[key] = new PoolNamespace(this, pattern, selector); | |
} | |
return this._namespaces[key]; | |
} | |
add(id, config) { | |
if (typeof id === 'object') { | |
config = id; | |
id = `CLUSTER::${++this._lastId}`; | |
} | |
if (typeof this._nodes[id] === 'undefined') { | |
this._nodes[id] = { | |
id: id, | |
errorCount: 0, | |
pool: new Pool({ config: new PoolConfig(config) }) | |
}; | |
this._serviceableNodeIds.push(id); | |
this._clearFindCaches(); | |
} | |
} | |
getConnection(pattern, selector, cb) { | |
let namespace; | |
if (typeof pattern === 'function') { | |
cb = pattern; | |
namespace = this.of(); | |
} else { | |
if (typeof selector === 'function') { | |
cb = selector; | |
selector = this._defaultSelector; | |
} | |
namespace = this.of(pattern, selector); | |
} | |
namespace.getConnection(cb); | |
} | |
end(callback) { | |
const cb = | |
callback !== undefined | |
? callback | |
: err => { | |
if (err) { | |
throw err; | |
} | |
}; | |
if (this._closed) { | |
process.nextTick(cb); | |
return; | |
} | |
this._closed = true; | |
let calledBack = false; | |
let waitingClose = 0; | |
const onEnd = err => { | |
if (!calledBack && (err || --waitingClose <= 0)) { | |
calledBack = true; | |
return cb(err); | |
} | |
}; | |
for (const id in this._nodes) { | |
waitingClose++; | |
this._nodes[id].pool.end(onEnd); | |
} | |
if (waitingClose === 0) { | |
process.nextTick(onEnd); | |
} | |
} | |
_findNodeIds(pattern) { | |
if (typeof this._findCaches[pattern] !== 'undefined') { | |
return this._findCaches[pattern]; | |
} | |
let foundNodeIds; | |
if (pattern === '*') { | |
// all | |
foundNodeIds = this._serviceableNodeIds; | |
} else if (this._serviceableNodeIds.indexOf(pattern) !== -1) { | |
// one | |
foundNodeIds = [pattern]; | |
} else { | |
// wild matching | |
const keyword = pattern.substring(pattern.length - 1, 0); | |
foundNodeIds = this._serviceableNodeIds.filter(id => | |
id.startsWith(keyword) | |
); | |
} | |
this._findCaches[pattern] = foundNodeIds; | |
return foundNodeIds; | |
} | |
_getNode(id) { | |
return this._nodes[id] || null; | |
} | |
_increaseErrorCount(node) { | |
if (++node.errorCount >= this._removeNodeErrorCount) { | |
const index = this._serviceableNodeIds.indexOf(node.id); | |
if (index !== -1) { | |
this._serviceableNodeIds.splice(index, 1); | |
delete this._nodes[node.id]; | |
this._clearFindCaches(); | |
node.pool.end(); | |
this.emit('remove', node.id); | |
} | |
} | |
} | |
_decreaseErrorCount(node) { | |
if (node.errorCount > 0) { | |
--node.errorCount; | |
} | |
} | |
_getConnection(node, cb) { | |
node.pool.getConnection((err, connection) => { | |
if (err) { | |
this._increaseErrorCount(node); | |
if (this._canRetry) { | |
// REVIEW: this seems wrong? | |
this.emit('warn', err); | |
// eslint-disable-next-line no-console | |
console.warn(`[Error] PoolCluster : ${err}`); | |
return cb(null, 'retry'); | |
} | |
return cb(err); | |
} | |
this._decreaseErrorCount(node); | |
connection._clusterId = node.id; | |
return cb(null, connection); | |
}); | |
} | |
_clearFindCaches() { | |
this._findCaches = {}; | |
} | |
} | |
module.exports = PoolCluster; | |