|
|
|
|
|
|
|
|
|
|
|
|
|
'use strict'; |
|
|
|
const allSettled = require('@ungap/promise-all-settled').bind(Promise); |
|
const Runner = require('../runner'); |
|
const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants; |
|
const debug = require('debug')('mocha:parallel:parallel-buffered-runner'); |
|
const {BufferedWorkerPool} = require('./buffered-worker-pool'); |
|
const {setInterval, clearInterval} = global; |
|
const {createMap, constants} = require('../utils'); |
|
const {MOCHA_ID_PROP_NAME} = constants; |
|
const {createFatalError} = require('../errors'); |
|
|
|
const DEFAULT_WORKER_REPORTER = require.resolve( |
|
'./reporters/parallel-buffered' |
|
); |
|
|
|
|
|
|
|
|
|
const DENY_OPTIONS = [ |
|
'globalSetup', |
|
'globalTeardown', |
|
'parallel', |
|
'p', |
|
'jobs', |
|
'j' |
|
]; |
|
|
|
|
|
|
|
|
|
|
|
|
|
const debugStats = pool => { |
|
const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats(); |
|
debug( |
|
'%d/%d busy workers; %d idle; %d tasks queued', |
|
busyWorkers, |
|
totalWorkers, |
|
idleWorkers, |
|
pendingTasks |
|
); |
|
}; |
|
|
|
|
|
|
|
|
|
const DEBUG_STATS_INTERVAL = 5000; |
|
|
|
const ABORTED = 'ABORTED'; |
|
const IDLE = 'IDLE'; |
|
const ABORTING = 'ABORTING'; |
|
const RUNNING = 'RUNNING'; |
|
const BAILING = 'BAILING'; |
|
const BAILED = 'BAILED'; |
|
const COMPLETE = 'COMPLETE'; |
|
|
|
const states = createMap({ |
|
[IDLE]: new Set([RUNNING, ABORTING]), |
|
[RUNNING]: new Set([COMPLETE, BAILING, ABORTING]), |
|
[COMPLETE]: new Set(), |
|
[ABORTED]: new Set(), |
|
[ABORTING]: new Set([ABORTED]), |
|
[BAILING]: new Set([BAILED, ABORTING]), |
|
[BAILED]: new Set([COMPLETE, ABORTING]) |
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
class ParallelBufferedRunner extends Runner { |
|
constructor(...args) { |
|
super(...args); |
|
|
|
let state = IDLE; |
|
Object.defineProperty(this, '_state', { |
|
get() { |
|
return state; |
|
}, |
|
set(newState) { |
|
if (states[state].has(newState)) { |
|
state = newState; |
|
} else { |
|
throw new Error(`invalid state transition: ${state} => ${newState}`); |
|
} |
|
} |
|
}); |
|
|
|
this._workerReporter = DEFAULT_WORKER_REPORTER; |
|
this._linkPartialObjects = false; |
|
this._linkedObjectMap = new Map(); |
|
|
|
this.once(Runner.constants.EVENT_RUN_END, () => { |
|
this._state = COMPLETE; |
|
}); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_createFileRunner(pool, options) { |
|
|
|
|
|
|
|
|
|
|
|
const emitEvent = (event, failureCount) => { |
|
this.emit(event.eventName, event.data, event.error); |
|
if ( |
|
this._state !== BAILING && |
|
event.data && |
|
event.data._bail && |
|
(failureCount || event.error) |
|
) { |
|
debug('run(): nonzero failure count & found bail flag'); |
|
|
|
|
|
this._state = BAILING; |
|
} |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
const linkEvent = event => { |
|
const stack = [{parent: event, prop: 'data'}]; |
|
while (stack.length) { |
|
const {parent, prop} = stack.pop(); |
|
const obj = parent[prop]; |
|
let newObj; |
|
if (obj && typeof obj === 'object') { |
|
if (obj[MOCHA_ID_PROP_NAME]) { |
|
const id = obj[MOCHA_ID_PROP_NAME]; |
|
newObj = this._linkedObjectMap.has(id) |
|
? Object.assign(this._linkedObjectMap.get(id), obj) |
|
: obj; |
|
this._linkedObjectMap.set(id, newObj); |
|
parent[prop] = newObj; |
|
} else { |
|
throw createFatalError( |
|
'Object missing ID received in event data', |
|
obj |
|
); |
|
} |
|
} |
|
Object.keys(newObj).forEach(key => { |
|
const value = obj[key]; |
|
if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) { |
|
stack.push({obj: value, parent: newObj, prop: key}); |
|
} |
|
}); |
|
} |
|
}; |
|
|
|
return async file => { |
|
debug('run(): enqueueing test file %s', file); |
|
try { |
|
const {failureCount, events} = await pool.run(file, options); |
|
|
|
if (this._state === BAILED) { |
|
|
|
|
|
|
|
|
|
return; |
|
} |
|
debug( |
|
'run(): completed run of file %s; %d failures / %d events', |
|
file, |
|
failureCount, |
|
events.length |
|
); |
|
this.failures += failureCount; |
|
let event = events.shift(); |
|
|
|
if (this._linkPartialObjects) { |
|
while (event) { |
|
linkEvent(event); |
|
emitEvent(event, failureCount); |
|
event = events.shift(); |
|
} |
|
} else { |
|
while (event) { |
|
emitEvent(event, failureCount); |
|
event = events.shift(); |
|
} |
|
} |
|
if (this._state === BAILING) { |
|
debug('run(): terminating pool due to "bail" flag'); |
|
this._state = BAILED; |
|
await pool.terminate(); |
|
} |
|
} catch (err) { |
|
if (this._state === BAILED || this._state === ABORTING) { |
|
debug( |
|
'run(): worker pool terminated with intent; skipping file %s', |
|
file |
|
); |
|
} else { |
|
|
|
debug('run(): encountered uncaught exception: %O', err); |
|
if (this.allowUncaught) { |
|
|
|
this._state = ABORTING; |
|
await pool.terminate(true); |
|
} |
|
throw err; |
|
} |
|
} finally { |
|
debug('run(): done running file %s', file); |
|
} |
|
}; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_bindSigIntListener(pool) { |
|
const sigIntListener = async () => { |
|
debug('run(): caught a SIGINT'); |
|
this._state = ABORTING; |
|
|
|
try { |
|
debug('run(): force-terminating worker pool'); |
|
await pool.terminate(true); |
|
} catch (err) { |
|
console.error( |
|
`Error while attempting to force-terminate worker pool: ${err}` |
|
); |
|
process.exitCode = 1; |
|
} finally { |
|
process.nextTick(() => { |
|
debug('run(): imminent death'); |
|
this._state = ABORTED; |
|
process.kill(process.pid, 'SIGINT'); |
|
}); |
|
} |
|
}; |
|
|
|
process.once('SIGINT', sigIntListener); |
|
|
|
return sigIntListener; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
run(callback, {files, options = {}} = {}) { |
|
|
|
|
|
|
|
let sigIntListener; |
|
|
|
|
|
|
|
options = {...options, reporter: this._workerReporter}; |
|
|
|
|
|
|
|
|
|
(async () => { |
|
|
|
|
|
|
|
let debugInterval; |
|
|
|
|
|
|
|
|
|
let pool; |
|
|
|
try { |
|
pool = BufferedWorkerPool.create({maxWorkers: options.jobs}); |
|
|
|
sigIntListener = this._bindSigIntListener(pool); |
|
|
|
|
|
debugInterval = setInterval( |
|
() => debugStats(pool), |
|
DEBUG_STATS_INTERVAL |
|
).unref(); |
|
|
|
|
|
|
|
this.started = true; |
|
this._state = RUNNING; |
|
|
|
this.emit(EVENT_RUN_BEGIN); |
|
|
|
options = {...options}; |
|
DENY_OPTIONS.forEach(opt => { |
|
delete options[opt]; |
|
}); |
|
|
|
const results = await allSettled( |
|
files.map(this._createFileRunner(pool, options)) |
|
); |
|
|
|
|
|
await pool.terminate(); |
|
|
|
results |
|
.filter(({status}) => status === 'rejected') |
|
.forEach(({reason}) => { |
|
if (this.allowUncaught) { |
|
|
|
throw reason; |
|
} |
|
|
|
|
|
this.uncaught(reason); |
|
}); |
|
|
|
if (this._state === ABORTING) { |
|
return; |
|
} |
|
|
|
this.emit(EVENT_RUN_END); |
|
debug('run(): completing with failure count %d', this.failures); |
|
callback(this.failures); |
|
} catch (err) { |
|
|
|
|
|
|
|
process.nextTick(() => { |
|
debug('run(): re-throwing uncaught exception'); |
|
throw err; |
|
}); |
|
} finally { |
|
clearInterval(debugInterval); |
|
process.removeListener('SIGINT', sigIntListener); |
|
} |
|
})(); |
|
return this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
linkPartialObjects(value) { |
|
this._linkPartialObjects = Boolean(value); |
|
return super.linkPartialObjects(value); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
isParallelMode() { |
|
return true; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workerReporter(reporter) { |
|
this._workerReporter = reporter; |
|
return this; |
|
} |
|
} |
|
|
|
module.exports = ParallelBufferedRunner; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|