var util = require('util');
var PriorityQueue = require('./PriorityQueue');
/**
* Constructs a new pool with the provided factory options.
*
* @constructor
* @classdesc
* A resource pooling class with a promise-based API.
*
* @param {PromisePool.Factory} opts
* The connection factory which specifies the functionality for the pool.
*/
function PromisePool(opts) {
this._opts = {
// Configuration options
name: opts.name || 'pool',
idleTimeoutMillis: opts.idleTimeoutMillis || 30000,
reapInterval: opts.reapIntervalMillis || 1000,
drainCheckInterval: opts.drainCheckIntervalMillis || 100,
refreshIdle: ('refreshIdle' in opts) ? opts.refreshIdle : true,
returnToHead: opts.returnToHead || false,
max: parseInt(opts.max, 10),
min: parseInt(opts.min, 10),
// Client management methods.
create: opts.create,
destroy: opts.destroy,
validate: opts.validate || function(){ return true; },
onRelease: opts.onRelease
};
this._availableObjects = [];
this._waitingClients = new PriorityQueue(opts.priorityRange || 1);
this._count = 0;
this._removeIdleScheduled = false;
this._removeIdleTimer = null;
this._draining = false;
// Prepare a logger function.
if (opts.log instanceof Function) {
this._log = opts.log;
}
else if (opts.log) {
this._log = _logger.bind(this);
}
else {
this._log = function(){};
}
// Clean up some of the inputs.
this._validate = opts.validate || function(){ return true; };
this._opts.max = Math.max(isNaN(this._opts.max) ? 1 : this._opts.max, 1);
this._opts.min = Math.min(isNaN(this._opts.min) ? 0 : this._opts.min, this._opts.max-1);
// Finally, ensure a minimum number of connections right out of the gate.
_ensureMinimum.call(this);
}
// ---------------------------------------------------------------------------------------------- //
/**
* @callback PromisePool.AcquireCallback
*
* Callback expected when acquiring a client. The returned promise will be used to manage the life
* of the client. Once it is resolved or rejected the acquired client is released back into the
* pool.
*
* @param {PromisePool.Client} client
* A newly acquired client from the pool.
*
* @return {Promise.<*>} A promise for whatever result the callback wants to send out.
*/
/**
* @callback PromisePool.Factory.create
*
* Function used to create new resources. It is expected to return either a new `PromisePool.Client`
* or a promise for one.
*
* @return {Promise.<PromisePool.Client>} A promise for a new client.
*/
/**
* @callback PromisePool.Factory.destroy
*
* Function used to destroy resources. The returned promise should resolve when the resource has
* been fully destroyed.
*
* @param {PromisePool.Client} client - A resource that had been created earlier.
*
* @return {?Promise} If destruction is asynchronous, a promise should be returned that will resolve
* after the client is destroyed.
*/
/**
* @callback PromisePool.Factory.validate
*
* A function that checks the validity of a resource before it is handed to waiting clients.
*
* @param {PromisePool.Client} client - A resource that had been created earlier.
*
* @return {bool} True if the resource is still valid, otherwise false should be returned.
*/
/**
* @callback PromisePool.Factory.onRelease
*
* A function called whenever a client is released back to the pool. May be use to reset the state
* of the client or to wait for any queued requests finish before adding to the pool.
*
* @param {PromisePool.Client} client - A resource that has been released back to the pool.
*
* @return {?Promise.<*>} May return a promise, in which case the client wont join the pool until
* the promise resolves. If it is rejected, then the client will be destroyed instead.
*/
/**
* @callback PromisePool.Factory.log
*
* A function taking a log message and a log level.
*
* @param {string} msg
* The message to be logged.
*
* @param {string} level
* The importance of this log message. Possible values are: `verbose`, `info`, and `error`.
*/
/**
* @namespace PromisePool.Factory
*
* @property {string} name
* Name of the pool. Used only for logging.
*
* @property {PromisePool.Factory.create} create
* Should create the item to be acquired, and return either a promise or the new client.
*
* @property {PromisePool.Factory.destroy} destroy
* Should gently close any resources that the item is using. Called to destroy resources.
*
* @property {PromisePool.Factory.validate} validate
* Optional. Should return true if the resource is still valid and false if it should be removed
* from the pool. Called before a resource is acquired from the pool.
*
* @property {PromisePool.Factory.onRelease} onRelease
* Optional. May return a promise to indicate when a client is ready to be added back to the pool
* after being released.
*
* @property {number} max
* Optional. Maximum number of items that can exist at the same time. Any further acquire requests
* will be pushed to the waiting list. Defaults to `1`.
*
* @property {number} min
* Optional. Minimum number of items in pool (including in-use). When the pool is created, or a
* resource destroyed, this minimum will be checked. If the pool resource count is below the
* minimum a new resource will be created and added to the pool. Defaults to `0`.
*
* @property {number} idleTimeoutMillis
* Optional. Maximum period for resources to be idle (e.g. not acquired) before they are destroyed.
* Defaults to `30000` (30 seconds).
*
* @property {number} reapIntervalMillis
* Optional. How frequently the pool will check for idle resources that need to be destroyed.
* Defaults to `1000` (1 second).
*
* @property {number} drainCheckIntervalMillis
* Optional. How frequently the pool will check the status of waiting clients and unreturned
* resources before destroying all the resources. Defaults to `100` (1/10th a second).
*
* @property {bool|PromisePool.Factory.log} log
* Optional. Whether the pool should log activity. If a function is provided, it will be called to
* log messages. If `true` is provided, messages are logged to `console.log`. Defaults to `false`.
*
* @property {number} priorityRange
* Optional. The range from 1 to be treated as a valid priority. Default is `1`.
*
* @property {bool} refreshIdle
* Optional. Indicates if idle resources should be destroyed when left idle for `idleTimeoutMillis`
* milliseconds. Defaults to true.
*
* @property {bool} returnToHead
* Optional. Returns released object to the head of the available objects list. Default is false.
*/
// ---------------------------------------------------------------------------------------------- //
/**
* Default logging method, just logs to `console.log`.
*
* @private
* @memberof PromisePool
*
* @param {string}
*/
function _logger(str, level) {
console.log(level.toUpperCase() + " pool " + this._opts.name + " - " + str);
}
/**
* Constructs more resources to bring the current count up to the minimum specified in the factory.
*
* @private
* @memberof PromisePool
*
* @return {Promise} A promise to create all the resources needed.
*/
function _ensureMinimum() {
// Nothing to do if draining.
if (this._draining) {
return Promise.resolve();
}
var diff = this._opts.min - this._count;
var promises = [];
for (var i = 0; i < diff; ++i) {
promises.push(this.acquire(function(client){ return Promise.resolve(); }));
}
return Promise.all(promises).then(function(){});
}
/**
* Constructs a new resource.
*
* @private
* @memberof PromisePool
*/
function _createResource() {
this._log(
util.format(
'PromisePool._createResource() - creating client - count=%d min=%d max=%d',
this._count, this._opts.min, this._opts.max
),
'verbose'
);
return Promise.resolve(this._opts.create());
}
/**
* Checks and removes the available (idle) clients that have timed out.
*
* @private
* @memberof PromisePool
*/
function _removeIdle() {
var removals = [];
var now = Date.now();
this._removeIdleScheduled = false;
// Go through the available (idle) items, check if they have timed out
var minCount = this._count - this._opts.min;
var refreshIdle = this._opts.refreshIdle;
for (
var i = 0;
i < this._availableObjects.length && (refreshIdle || (minCount > removals.length));
++i
) {
var timeout = this._availableObjects[i].timeout;
if (now >= timeout) {
// Client timed out, so destroy it.
this._log(
'removeIdle() destroying obj - now:' + now + ' timeout:' + timeout,
'verbose'
);
removals.push(this.destroy(this._availableObjects[i].obj));
--i;
}
}
// Replace the available items with the ones to keep.
if (this._availableObjects.length > 0) {
this._log('availableObjects.length=' + this._availableObjects.length, 'verbose');
_scheduleRemoveIdle.call(this);
}
else {
this._log('removeIdle() all objects removed', 'verbose');
}
// Return a promise for when all the destructions have completed.
return Promise.all(removals);
}
/**
* Schedule removal of idle items in the pool.
*
* Only one removal at a time can be scheduled.
*
* @private
* @memberof PromisePool
*/
function _scheduleRemoveIdle(){
if (!this._removeIdleScheduled) {
this._removeIdleScheduled = true;
this._removeIdleTimer = setTimeout(_removeIdle.bind(this), this._opts.reapInterval);
}
}
/**
* Try to get a new client to work, and clean up pool unused (idle) items.
*
* @private
* @memberof PromisePool
*
* - If there are available clients waiting, shift the first one out (LIFO), and call its callback.
* - If there are no waiting clients, try to create one if it won't exceed the maximum number of
* clients.
* - If creating a new client would exceed the maximum, add the client to the wait list.
*/
function _dispense() {
var waitingCount = this._waitingClients.length;
this._log(
'dispense() clients=' + waitingCount + ' available=' + this._availableObjects.length,
'info'
);
if (waitingCount > 0) {
while (this._availableObjects.length > 0) {
this._log('dispense() - reusing obj', 'verbose');
var objWithTimeout = this._availableObjects[0];
// Make sure the client is still valid before handing it back.
if (!this._opts.validate(objWithTimeout.obj)) {
this.destroy(objWithTimeout.obj); // Don't care about waiting for this.
continue;
}
// We have a valid, idle client: ship it!
this._availableObjects.shift();
this._waitingClients.dequeue().resolve(objWithTimeout.obj);
return; // Only dispense one.
}
if (this._count < this._opts.max) {
var self = this;
++this._count;
_createResource.call(this).then(function(client){
self._waitingClients.dequeue().resolve(client);
}, function(err){
--self._count;
self._waitingClients.dequeue().reject(err);
});
}
}
}
/**
* Creates a filter usable for finding a client in the available clients list.
*
* @private
* @memberof PromisePool
*
* @param {PromisePool.Client} obj
* The client to filter the list for.
*
* @param {bool} eql
* Indicates if the test should be for equality (`===`) or not (`!==`).
*
* @return {Function} A function which can be used for array filtering methods.
*/
function _objFilter(obj, eql) {
return function(objWithTimeout){
return (eql ? (obj === objWithTimeout.obj) : (obj !== objWithTimeout.obj));
};
}
// ---------------------------------------------------------------------------------------------- //
/**
* Request a new client. The callback will be called with a client when one becomes available.
*
* @param {PromisePool.AcquireCallback} callback
* Callback function to be called after the acquire is successful. The function will receive the
* acquired item as the first parameter.
*
* @param {Number} priority
* Optional. Integer between 0 and (priorityRange - 1). Specifies the priority of the caller if
* there are no available resources. Lower numbers mean higher priority.
*
* @returns {Promise.<*>} A promise for the results of the acquire callback.
*/
PromisePool.prototype.acquire = function(callback, priority) {
if (this._draining) {
throw new Error("Pool is draining and cannot accept work");
}
var self = this;
var waiter = {};
waiter.promise = new Promise(function(resolve, reject){
waiter.resolve = resolve;
waiter.reject = reject;
}).then(function(client){
return new Promise(function(resolve, reject){
try {
return callback(client)
.then(
function(res){ self.release(client); resolve(res); },
function(err){ self.release(client); reject(err); }
);
}
catch (err) {
self.release(client);
reject(err);
}
});
});
this._waitingClients.enqueue(waiter, priority);
process.nextTick(_dispense.bind(this));
return waiter.promise;
};
/**
* Return the client to the pool, in case it is no longer required.
*
* @param {PromisePool.Client} obj
* The acquired object to be put back to the pool.
*/
PromisePool.prototype.release = function(obj) {
// Check to see if this object has already been released (e.g. back in the pool of
// availableObjects)
if (this._availableObjects.some(_objFilter(obj, true))) {
this._log('release called twice for the same resource: ' + (new Error().stack), 'error');
return;
}
if (obj.__promisePool_destroyed) {
this._log('Released resource is destroyed, not returning to pool.', 'info');
}
else {
var self = this;
var _release = function(){
var objWithTimeout = {
obj: obj,
timeout: (Date.now() + self._opts.idleTimeoutMillis)
};
if (self._opts.returnToHead) {
self._availableObjects.unshift(objWithTimeout);
}
else {
self._availableObjects.push(objWithTimeout);
}
self._log('timeout: ' + objWithTimeout.timeout, 'verbose');
}
if (this._opts.onRelease) {
Promise.resolve(this._opts.onRelease(obj)).then(function(){
_release();
process.nextTick(_dispense.bind(this));
_scheduleRemoveIdle.call(this);
}, function(err){
self._log('Error releasing client: ' + err.stack, 'error');
self.destroy(obj);
});
}
else {
_release();
}
}
process.nextTick(_dispense.bind(this));
_scheduleRemoveIdle.call(this);
};
/**
* Request the client to be destroyed. The factory's destroy handler
* will also be called.
*
* This should be called within an acquire() block as an alternative to release().
*
* @param {PromisePool.Client} obj
* The acquired item to be destoyed.
*/
PromisePool.prototype.destroy = function(obj) {
this._log('Destroying object, count=' + this._count, 'verbose');
--this._count;
this._availableObjects = this._availableObjects.filter(_objFilter(obj, false));
var self = this;
return Promise.resolve(this._opts.destroy(obj))
.then(function(){
obj.__promisePool_destroyed = true;
return _ensureMinimum.call(self);
});
};
/**
* Disallow any new requests and let the request backlog dissapate.
*
* After all clients have finished, the pool will then destroy all pooled resources.
*
* @return {Promise} A promise to let all clients finish and destroy all pooled objects.
*/
PromisePool.prototype.drain = function(){
this._log('draining', 'info');
// Disable the ability to put more work on the queue.
this._draining = true;
var self = this;
return new Promise(function(resolve, reject){
function check(){
if (self._waitingClients.length > 0) {
// Wait until all client requests have been satisfied.
self._log(
'Delaying drain, ' + self._waitingClients.length + ' clients in queue.',
'verbose'
);
setTimeout(check, self._opts.drainCheckInterval);
}
else if (self._availableObjects.length < self._count) {
// Wait until all objects have been released.
var missingCount = self._count - self._availableObjects.length;
self._log(
'Delaying drain, ' + missingCount + ' items need to be released.',
'verbose'
);
setTimeout(check, self._opts.drainCheckInterval);
}
else {
// We have no waiting clients, and all objects have been returned to the pool. Now
// we clean up by destroying everything.
self.destroyAllNow().then(resolve, reject);
}
};
check();
});
};
/**
* Forcibly destroys all clients regardless of timeout.
*
* Intended to be invoked as part of a drain. Does not prevent the creation of new clients as a
* result of subsequent calls to acquire.
*
* Note that if `factory.min > 0` and the pool is not draining, the pool will destroy all idle
* resources in the pool, but replace them with newly created resources up to the specified
* `factory.min` value. If this is not desired, set `factory.min` to zero before calling
* `PromisePool#destroyAllNow()`.
*
* @return {Promise} A promise to have all objects in the pool destroyed.
*/
PromisePool.prototype.destroyAllNow = function() {
this._log('force destroying all objects', 'info');
// Stop the idle object removal checker, we're about to remove all of them now.
this._removeIdleScheduled = false;
clearTimeout(this._removeIdleTimer);
// Repeatedly call destroy until no more objects are available.
var destroyPromises = [];
while (this._availableObjects.length > 0) {
destroyPromises.push(this.destroy(this._availableObjects[0].obj));
}
return Promise.all(destroyPromises).then(function(){});
};
/**
* Decorates a function to use a acquired client from the object pool when called.
*
* @param {PromisePool.AcquireCallback} decorated
* The decorated function, accepting a client as the first argument and returning a promise.
*
* @param {Number} priority
* Optional. Integer between 0 and (priorityRange - 1). Specifies the priority of the caller if
* there are no available resources. Lower numbers mean higher priority.
*
* @return {Function} A function wrapping `decorated` by first acquiring a client.
*/
PromisePool.prototype.pooled = function(decorated, priority){
var self = this;
var slice = Array.prototype.slice;
return function(){
var args = slice.call(arguments);
var wrappedSelf = this;
return self.acquire(function(client){
args.unshift(client);
return decorated.apply(wrappedSelf, args);
}, priority);
};
};
/**
* The total number of resources in the pool.
*
* @readonly
* @member {number} PromisePool.prototype.length
*/
Object.defineProperty(PromisePool.prototype, 'length', {
get: function(){ return this._count; },
enumerable: true
});
/**
* The name of the pool, as provided in the factory.
*
* @readonly
* @member {string} PromisePool.prototype.name
*/
Object.defineProperty(PromisePool.prototype, 'name', {
get: function(){ return this._opts.name; },
enumerable: true
});
/**
* The number of available (e.g. idle) resources in the pool.
*
* @readonly
* @member {number} PromisePool.prototype.availableLength
*/
Object.defineProperty(PromisePool.prototype, 'availableLength', {
get: function(){ return this._availableObjects.length; },
enumerable: true
});
/**
* The number of clients currently waiting for a resource to become available/be created.
*
* @readonly
* @member {number} PromisePool.prototype.waitingClientLength
*/
Object.defineProperty(PromisePool.prototype, 'waitingClientLength', {
get: function(){ return this._waitingClients.length; },
enumerable: true
});
/**
* The maximum number of resources this pool will create.
*
* @readonly
* @member {number} PromisePool.prototype.max
*/
Object.defineProperty(PromisePool.prototype, 'max', {
get: function(){ return this._opts.max; },
enumerable: true
});
/**
* The minimum number of resources the pool will keep at any given time.
*
* @readonly
* @member {number} PromisePool.prototype.min
*/
Object.defineProperty(PromisePool.prototype, 'min', {
get: function(){ return this._opts.min; },
enumerable: true
});
module.exports = PromisePool;