Spaces:
Runtime error
Runtime error
| const Timer = require('../util/timer'); | |
| /** | |
| * This class uses the token bucket algorithm to control a queue of tasks. | |
| */ | |
| class TaskQueue { | |
| /** | |
| * Creates an instance of TaskQueue. | |
| * To allow bursts, set `maxTokens` to several times the average task cost. | |
| * To prevent bursts, set `maxTokens` to the cost of the largest tasks. | |
| * Note that tasks with a cost greater than `maxTokens` will be rejected. | |
| * | |
| * @param {number} maxTokens - the maximum number of tokens in the bucket (burst size). | |
| * @param {number} refillRate - the number of tokens to be added per second (sustain rate). | |
| * @param {object} options - optional settings for the new task queue instance. | |
| * @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`). | |
| * @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit). | |
| * @memberof TaskQueue | |
| */ | |
| constructor (maxTokens, refillRate, options = {}) { | |
| this._maxTokens = maxTokens; | |
| this._refillRate = refillRate; | |
| this._pendingTaskRecords = []; | |
| this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens; | |
| this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity; | |
| this._timer = new Timer(); | |
| this._timer.start(); | |
| this._timeout = null; | |
| this._lastUpdateTime = this._timer.timeElapsed(); | |
| this._runTasks = this._runTasks.bind(this); | |
| } | |
| /** | |
| * Get the number of queued tasks which have not yet started. | |
| * | |
| * @readonly | |
| * @memberof TaskQueue | |
| */ | |
| get length () { | |
| return this._pendingTaskRecords.length; | |
| } | |
| /** | |
| * Wait until the token bucket is full enough, then run the provided task. | |
| * | |
| * @param {Function} task - the task to run. | |
| * @param {number} [cost=1] - the number of tokens this task consumes from the bucket. | |
| * @returns {Promise} - a promise for the task's return value. | |
| * @memberof TaskQueue | |
| */ | |
| do (task, cost = 1) { | |
| if (this._maxTotalCost < Infinity) { | |
| const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0); | |
| if (currentTotalCost + cost > this._maxTotalCost) { | |
| return Promise.reject('Maximum total cost exceeded'); | |
| } | |
| } | |
| const newRecord = { | |
| cost | |
| }; | |
| newRecord.promise = new Promise((resolve, reject) => { | |
| newRecord.cancel = () => { | |
| reject(new Error('Task canceled')); | |
| }; | |
| // The caller, `_runTasks()`, is responsible for cost-checking and spending tokens. | |
| newRecord.wrappedTask = () => { | |
| try { | |
| resolve(task()); | |
| } catch (e) { | |
| reject(e); | |
| } | |
| }; | |
| }); | |
| this._pendingTaskRecords.push(newRecord); | |
| // If the queue has been idle we need to prime the pump | |
| if (this._pendingTaskRecords.length === 1) { | |
| this._runTasks(); | |
| } | |
| return newRecord.promise; | |
| } | |
| /** | |
| * Cancel one pending task, rejecting its promise. | |
| * | |
| * @param {Promise} taskPromise - the promise returned by `do()`. | |
| * @returns {boolean} - true if the task was found, or false otherwise. | |
| * @memberof TaskQueue | |
| */ | |
| cancel (taskPromise) { | |
| const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise); | |
| if (taskIndex !== -1) { | |
| const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1); | |
| taskRecord.cancel(); | |
| if (taskIndex === 0 && this._pendingTaskRecords.length > 0) { | |
| this._runTasks(); | |
| } | |
| return true; | |
| } | |
| return false; | |
| } | |
| /** | |
| * Cancel all pending tasks, rejecting all their promises. | |
| * | |
| * @memberof TaskQueue | |
| */ | |
| cancelAll () { | |
| if (this._timeout !== null) { | |
| this._timer.clearTimeout(this._timeout); | |
| this._timeout = null; | |
| } | |
| const oldTasks = this._pendingTaskRecords; | |
| this._pendingTaskRecords = []; | |
| oldTasks.forEach(r => r.cancel()); | |
| } | |
| /** | |
| * Shorthand for calling _refill() then _spend(cost). | |
| * | |
| * @see {@link TaskQueue#_refill} | |
| * @see {@link TaskQueue#_spend} | |
| * @param {number} cost - the number of tokens to try to spend. | |
| * @returns {boolean} true if we had enough tokens; false otherwise. | |
| * @memberof TaskQueue | |
| */ | |
| _refillAndSpend (cost) { | |
| this._refill(); | |
| return this._spend(cost); | |
| } | |
| /** | |
| * Refill the token bucket based on the amount of time since the last refill. | |
| * | |
| * @memberof TaskQueue | |
| */ | |
| _refill () { | |
| const now = this._timer.timeElapsed(); | |
| const timeSinceRefill = now - this._lastUpdateTime; | |
| if (timeSinceRefill <= 0) return; | |
| this._lastUpdateTime = now; | |
| this._tokenCount += timeSinceRefill * this._refillRate / 1000; | |
| this._tokenCount = Math.min(this._tokenCount, this._maxTokens); | |
| } | |
| /** | |
| * If we can "afford" the given cost, subtract that many tokens and return true. | |
| * Otherwise, return false. | |
| * | |
| * @param {number} cost - the number of tokens to try to spend. | |
| * @returns {boolean} true if we had enough tokens; false otherwise. | |
| * @memberof TaskQueue | |
| */ | |
| _spend (cost) { | |
| if (cost <= this._tokenCount) { | |
| this._tokenCount -= cost; | |
| return true; | |
| } | |
| return false; | |
| } | |
| /** | |
| * Loop until the task queue is empty, running each task and spending tokens to do so. | |
| * Any time the bucket can't afford the next task, delay asynchronously until it can. | |
| * | |
| * @memberof TaskQueue | |
| */ | |
| _runTasks () { | |
| if (this._timeout) { | |
| this._timer.clearTimeout(this._timeout); | |
| this._timeout = null; | |
| } | |
| for (;;) { | |
| const nextRecord = this._pendingTaskRecords.shift(); | |
| if (!nextRecord) { | |
| // We ran out of work. Go idle until someone adds another task to the queue. | |
| return; | |
| } | |
| if (nextRecord.cost > this._maxTokens) { | |
| throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`); | |
| } | |
| // Refill before each task in case the time it took for the last task to run was enough to afford the next. | |
| if (this._refillAndSpend(nextRecord.cost)) { | |
| nextRecord.wrappedTask(); | |
| } else { | |
| // We can't currently afford this task. Put it back and wait until we can and try again. | |
| this._pendingTaskRecords.unshift(nextRecord); | |
| const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0); | |
| const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); | |
| this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait); | |
| return; | |
| } | |
| } | |
| } | |
| } | |
| module.exports = TaskQueue; | |