Spaces:
Running
Running
/* eslint-disable no-var */ | |
var reusify = require('reusify') | |
function fastqueue (context, worker, concurrency) { | |
if (typeof context === 'function') { | |
concurrency = worker | |
worker = context | |
context = null | |
} | |
if (concurrency < 1) { | |
throw new Error('fastqueue concurrency must be greater than 1') | |
} | |
var cache = reusify(Task) | |
var queueHead = null | |
var queueTail = null | |
var _running = 0 | |
var errorHandler = null | |
var self = { | |
push: push, | |
drain: noop, | |
saturated: noop, | |
pause: pause, | |
paused: false, | |
concurrency: concurrency, | |
running: running, | |
resume: resume, | |
idle: idle, | |
length: length, | |
getQueue: getQueue, | |
unshift: unshift, | |
empty: noop, | |
kill: kill, | |
killAndDrain: killAndDrain, | |
error: error | |
} | |
return self | |
function running () { | |
return _running | |
} | |
function pause () { | |
self.paused = true | |
} | |
function length () { | |
var current = queueHead | |
var counter = 0 | |
while (current) { | |
current = current.next | |
counter++ | |
} | |
return counter | |
} | |
function getQueue () { | |
var current = queueHead | |
var tasks = [] | |
while (current) { | |
tasks.push(current.value) | |
current = current.next | |
} | |
return tasks | |
} | |
function resume () { | |
if (!self.paused) return | |
self.paused = false | |
for (var i = 0; i < self.concurrency; i++) { | |
_running++ | |
release() | |
} | |
} | |
function idle () { | |
return _running === 0 && self.length() === 0 | |
} | |
function push (value, done) { | |
var current = cache.get() | |
current.context = context | |
current.release = release | |
current.value = value | |
current.callback = done || noop | |
current.errorHandler = errorHandler | |
if (_running === self.concurrency || self.paused) { | |
if (queueTail) { | |
queueTail.next = current | |
queueTail = current | |
} else { | |
queueHead = current | |
queueTail = current | |
self.saturated() | |
} | |
} else { | |
_running++ | |
worker.call(context, current.value, current.worked) | |
} | |
} | |
function unshift (value, done) { | |
var current = cache.get() | |
current.context = context | |
current.release = release | |
current.value = value | |
current.callback = done || noop | |
if (_running === self.concurrency || self.paused) { | |
if (queueHead) { | |
current.next = queueHead | |
queueHead = current | |
} else { | |
queueHead = current | |
queueTail = current | |
self.saturated() | |
} | |
} else { | |
_running++ | |
worker.call(context, current.value, current.worked) | |
} | |
} | |
function release (holder) { | |
if (holder) { | |
cache.release(holder) | |
} | |
var next = queueHead | |
if (next) { | |
if (!self.paused) { | |
if (queueTail === queueHead) { | |
queueTail = null | |
} | |
queueHead = next.next | |
next.next = null | |
worker.call(context, next.value, next.worked) | |
if (queueTail === null) { | |
self.empty() | |
} | |
} else { | |
_running-- | |
} | |
} else if (--_running === 0) { | |
self.drain() | |
} | |
} | |
function kill () { | |
queueHead = null | |
queueTail = null | |
self.drain = noop | |
} | |
function killAndDrain () { | |
queueHead = null | |
queueTail = null | |
self.drain() | |
self.drain = noop | |
} | |
function error (handler) { | |
errorHandler = handler | |
} | |
} | |
function noop () {} | |
function Task () { | |
this.value = null | |
this.callback = noop | |
this.next = null | |
this.release = noop | |
this.context = null | |
this.errorHandler = null | |
var self = this | |
this.worked = function worked (err, result) { | |
var callback = self.callback | |
var errorHandler = self.errorHandler | |
var val = self.value | |
self.value = null | |
self.callback = noop | |
if (self.errorHandler) { | |
errorHandler(err, val) | |
} | |
callback.call(self.context, err, result) | |
self.release(self) | |
} | |
} | |
function queueAsPromised (context, worker, concurrency) { | |
if (typeof context === 'function') { | |
concurrency = worker | |
worker = context | |
context = null | |
} | |
function asyncWrapper (arg, cb) { | |
worker.call(this, arg) | |
.then(function (res) { | |
cb(null, res) | |
}, cb) | |
} | |
var queue = fastqueue(context, asyncWrapper, concurrency) | |
var pushCb = queue.push | |
var unshiftCb = queue.unshift | |
queue.push = push | |
queue.unshift = unshift | |
queue.drained = drained | |
return queue | |
function push (value) { | |
var p = new Promise(function (resolve, reject) { | |
pushCb(value, function (err, result) { | |
if (err) { | |
reject(err) | |
return | |
} | |
resolve(result) | |
}) | |
}) | |
// Let's fork the promise chain to | |
// make the error bubble up to the user but | |
// not lead to a unhandledRejection | |
p.catch(noop) | |
return p | |
} | |
function unshift (value) { | |
var p = new Promise(function (resolve, reject) { | |
unshiftCb(value, function (err, result) { | |
if (err) { | |
reject(err) | |
return | |
} | |
resolve(result) | |
}) | |
}) | |
// Let's fork the promise chain to | |
// make the error bubble up to the user but | |
// not lead to a unhandledRejection | |
p.catch(noop) | |
return p | |
} | |
function drained () { | |
var previousDrain = queue.drain | |
var p = new Promise(function (resolve) { | |
queue.drain = function () { | |
previousDrain() | |
resolve() | |
} | |
}) | |
return p | |
} | |
} | |
module.exports = fastqueue | |
module.exports.promise = queueAsPromised | |