Skip to content

Commit

Permalink
add WorkerPool wrapper, fix --grep
Browse files Browse the repository at this point in the history
- created a wrapper around the `workerpool` module to help decouple and give us a place to do serialization before invoking the worker's `run` method
- do not warn about dubious max worker count (`jobs` option) since it's not something we can be sure is an actual problem (make it debug output instead)
- adds [serialize-javascript](https://npm.im/serialize-javascript) module
    - allows easier transmission of non-JSON-compatible objects over IPC, e.g., a `RegExp` (for `--grep`)
    - requires use of `eval()` to deserialize.  I'm not too worried about this, but I think I need to play with it more
    - this avoids more custom serialization code, but is not especially helpful when serializing `Test`, `Suite` and `Hook` instances
- in the integration test helper code, if we provide an _absolute path_, do not make a guess about where the fixture is when running `runMocha` or `runMochaJSON`. this makes globs easier to use
  • Loading branch information
boneskull committed May 6, 2020
1 parent 336226d commit 331fca9
Show file tree
Hide file tree
Showing 14 changed files with 422 additions and 166 deletions.
1 change: 1 addition & 0 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ overrides:
- 'lib/worker.js'
- 'lib/reporters/buffered.js'
- 'lib/serializer.js'
- 'lib/pool.js'
- 'test/reporters/buffered.spec.js'
parserOptions:
ecmaVersion: 2018
Expand Down
1 change: 1 addition & 0 deletions karma.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ module.exports = config => {
.ignore('./lib/reporters/buffered.js')
.ignore('./lib/serializer.js')
.ignore('./lib/worker.js')
.ignore('./lib/pool.js')
.on('bundled', (err, content) => {
if (err) {
throw err;
Expand Down
44 changes: 10 additions & 34 deletions lib/buffered-runner.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
'use strict';

const allSettled = require('promise.allsettled');
const os = require('os');
const Runner = require('./runner');
const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants;
const debug = require('debug')('mocha:parallel:buffered-runner');
const workerpool = require('workerpool');
const {deserialize} = require('./serializer');
const WORKER_PATH = require.resolve('./worker.js');
const {WorkerPool} = require('./pool');
const {setInterval, clearInterval} = global;
const {createMap, warn} = require('./utils');
const {createMap} = require('./utils');

/**
* Outputs a debug statement with worker stats
* @param {WorkerPool} pool - Worker pool
*/
const debugStats = pool => {
const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats();
debug(
Expand Down Expand Up @@ -108,36 +110,15 @@ class BufferedRunner extends Runner {
let pool;

try {
const cpuCount = os.cpus().length;
const maxJobs = cpuCount - 1;
const jobs = Math.max(1, Math.min(options.jobs || maxJobs, maxJobs));
if (maxJobs < 2) {
warn(
`(Mocha) not enough CPU cores available (${cpuCount}) to run multiple jobs; avoid --parallel on this machine`
);
} else if (options.jobs && options.jobs > maxJobs) {
warn(
`(Mocha) ${options.jobs} concurrent jobs requested, but only enough cores available for ${maxJobs}`
);
}
debug(
'run(): starting worker pool of size %d, using node args: %s',
jobs,
process.execArgv.join(' ')
);
pool = workerpool.pool(WORKER_PATH, {
workerType: 'process',
maxWorkers: jobs,
forkOpts: {execArgv: process.execArgv}
});
pool = WorkerPool.create({maxWorkers: options.jobs});

sigIntListener = async () => {
if (this._state !== ABORTING) {
debug('run(): caught a SIGINT');
this._state = ABORTING;

try {
debug('run(): shutting down %d (max) workers', jobs);
debug('run(): force-terminating worker pool');
await pool.terminate(true);
} catch (err) {
console.error(
Expand All @@ -155,10 +136,6 @@ class BufferedRunner extends Runner {

process.once('SIGINT', sigIntListener);

// the "pool proxy" object is essentially just syntactic sugar to call a
// worker's procedure as one would a regular function.
const poolProxy = await pool.proxy();

debugInterval = setInterval(
() => debugStats(pool),
DEBUG_STATS_INTERVAL
Expand All @@ -174,12 +151,11 @@ class BufferedRunner extends Runner {
files.map(async file => {
debug('run(): enqueueing test file %s', file);
try {
const result = await poolProxy.run(file, options);
const {failureCount, events} = await pool.run(file, options);
if (this._state === BAILED) {
// short-circuit after a graceful bail
return;
}
const {failureCount, events} = deserialize(result);
debug(
'run(): completed run of file %s; %d failures / %d events',
file,
Expand Down
163 changes: 163 additions & 0 deletions lib/pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
'use strict';

const serializeJavascript = require('serialize-javascript');
const workerpool = require('workerpool');
const {deserialize} = require('./serializer');
const debug = require('debug')('mocha:parallel:pool');
const {cpus} = require('os');
const {createInvalidArgumentTypeError} = require('./errors');

const WORKER_PATH = require.resolve('./worker.js');

/**
* A mapping of Mocha `Options` objects to serialized values.
*
* This is helpful because we tend to same the same options over and over
* over IPC.
* @type {WeakMap<Options,string>}
*/
let optionsCache = new WeakMap();

/**
* Count of CPU cores
*/
const CPU_COUNT = cpus().length;

/**
* Default max number of workers.
*
* We are already using one core for the main process, so assume only _n - 1_ are left.
*
* This is a reasonable default, but YMMV.
*/
const DEFAULT_MAX_WORKERS = CPU_COUNT - 1;

/**
* These options are passed into the [workerpool](https://npm.im/workerpool) module.
* @type {Partial<WorkerPoolOptions>}
*/
const WORKER_POOL_DEFAULT_OPTS = {
// use child processes, not worker threads!
workerType: 'process',
// ensure the same flags sent to `node` for this `mocha` invocation are passed
// along to children
forkOpts: {execArgv: process.execArgv},
maxWorkers: DEFAULT_MAX_WORKERS
};

/**
* A wrapper around a third-party worker pool implementation.
*/
class WorkerPool {
constructor(opts = WORKER_POOL_DEFAULT_OPTS) {
const maxWorkers = Math.max(1, opts.maxWorkers);

if (maxWorkers < 2) {
debug(
'not enough CPU cores available (%d) to run multiple jobs; avoid --parallel on this machine',
CPU_COUNT
);
} else if (maxWorkers >= CPU_COUNT) {
debug(
'%d concurrent job(s) requested, but only %d core(s) available',
maxWorkers,
CPU_COUNT
);
}
debug(
'run(): starting worker pool of max size %d, using node args: %s',
maxWorkers,
process.execArgv.join(' ')
);

this.options = Object.assign({}, opts, {maxWorkers});
this._pool = workerpool.pool(WORKER_PATH, this.options);
}

/**
* Terminates all workers in the pool.
* @param {boolean} [force] - Whether to force-kill workers. By default, lets workers finish their current task before termination.
* @private
* @returns {Promise<void>}
*/
async terminate(force = false) {
return this._pool.terminate(force);
}

/**
* Adds a test file run to the worker pool queue for execution by a worker process.
*
* Handles serialization/deserialization.
*
* @param {string} filepath - Filepath of test
* @param {Options} [options] - Options for Mocha instance
* @private
* @returns {Promise<SerializedWorkerResult>}
*/
async run(filepath, options = {}) {
if (!filepath || typeof filepath !== 'string') {
throw createInvalidArgumentTypeError(
'Expected a non-empty filepath',
'filepath',
'string'
);
}
const serializedOptions = WorkerPool.serializeOptions(options);
const result = await this._pool.exec('run', [filepath, serializedOptions]);
return deserialize(result);
}

/**
* Returns stats about the state of the worker processes in the pool.
*
* Used for debugging.
*
* @private
*/
stats() {
return this._pool.stats();
}

/**
* Instantiates a {@link WorkerPool}.
*/
static create(...args) {
return new WorkerPool(...args);
}

/**
* Given Mocha options object `opts`, serialize into a format suitable for
* transmission over IPC.
*
* @param {Options} [opts] - Mocha options
* @private
* @returns {string} Serialized options
*/
static serializeOptions(opts = {}) {
if (!optionsCache.has(opts)) {
const serialized = serializeJavascript(opts, {
unsafe: true, // this means we don't care about XSS
ignoreFunction: true // do not serialize functions
});
optionsCache.set(opts, serialized);
debug(
'serializeOptions(): serialized options %O to: %s',
opts,
serialized
);
}
return optionsCache.get(opts);
}

/**
* Resets internal cache of serialized options objects.
*
* For testing/debugging
* @private
*/
static resetOptionsCache() {
optionsCache = new WeakMap();
}
}

exports.WorkerPool = WorkerPool;
5 changes: 3 additions & 2 deletions lib/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Test.prototype.clone = function() {
};

/**
* Returns an object suitable for IPC.
* Returns an minimal object suitable for transmission over IPC.
* Functions are represented by keys beginning with `$$`.
* @returns {Object}
*/
Expand All @@ -93,6 +93,7 @@ Test.prototype.serialize = function serialize() {
speed: this.speed,
state: this.state,
title: this.title,
type: this.type
type: this.type,
file: this.file
};
};
44 changes: 33 additions & 11 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
'use strict';

const {createInvalidArgumentTypeError} = require('./errors');
const {
createInvalidArgumentTypeError,
createInvalidArgumentValueError
} = require('./errors');
const workerpool = require('workerpool');
const Mocha = require('./mocha');
const {
handleRequires,
validatePlugin,
loadRootHooks
} = require('./cli/run-helpers');
const {handleRequires, validatePlugin} = require('./cli/run-helpers');
const debug = require('debug')(`mocha:parallel:worker:${process.pid}`);
const {serialize} = require('./serializer');
const {setInterval, clearInterval} = global;
Expand All @@ -34,20 +33,23 @@ if (workerpool.isMainThread) {
* @param {Options} argv - Command-line options
*/
let bootstrap = async argv => {
const rawRootHooks = handleRequires(argv.require);
rootHooks = await loadRootHooks(rawRootHooks);
handleRequires(argv.require);
// const rawRootHooks = handleRequires(argv.require);
// rootHooks = await loadRootHooks(rawRootHooks);
validatePlugin(argv, 'ui', Mocha.interfaces);
bootstrap = () => {};
debug('bootstrap(): finished with args: %O', argv);
};

/**
* Runs a single test file in a worker thread.
* @param {string} filepath - Filepath of test file
* @param {Options} [argv] - Parsed command-line options object
* @param {string} [serializedOptions] - **Serialized** options. This string will be eval'd!
* @see https://npm.im/serialize-javascript
* @returns {Promise<{failures: number, events: BufferedEvent[]}>} - Test
* failure count and list of events.
*/
async function run(filepath, argv = {ui: 'bdd'}) {
async function run(filepath, serializedOptions = '{}') {
if (!filepath) {
throw createInvalidArgumentTypeError(
'Expected a non-empty "filepath" argument',
Expand All @@ -58,7 +60,27 @@ async function run(filepath, argv = {ui: 'bdd'}) {

debug('run(): running test file %s', filepath);

const opts = Object.assign(argv, {
if (typeof serializedOptions !== 'string') {
throw createInvalidArgumentTypeError(
'run() expects second parameter to be a string which was serialized by the `serialize-javascript` module',
'serializedOptions',
'string'
);
}
let argv;
try {
// eslint-disable-next-line no-eval
argv = eval('(' + serializedOptions + ')');
} catch (err) {
throw createInvalidArgumentValueError(
'run() was unable to deserialize the options',
'serializedOptions',
serializedOptions
);
}

debug('run(): deserialized options to %O', argv);
const opts = Object.assign({ui: 'bdd'}, argv, {
// workers only use the `Buffered` reporter.
reporter: BUFFERED_REPORTER_PATH,
// if this was true, it would cause infinite recursion.
Expand Down
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package-scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function test(testName, mochaParams) {
module.exports = {
scripts: {
build: {
script: `browserify -e browser-entry.js --plugin ./scripts/dedefine --ignore './lib/cli/*.js' --ignore "./lib/esm-utils.js" --ignore 'chokidar' --ignore 'fs' --ignore 'glob' --ignore 'path' --ignore 'supports-color' --ignore './lib/buffered-runner.js' --ignore './lib/serializer.js' --ignore './lib/reporters/buffered.js' --ignore './lib/worker.js' -o mocha.js`,
script: `browserify -e browser-entry.js --plugin ./scripts/dedefine --ignore './lib/cli/*.js' --ignore "./lib/esm-utils.js" --ignore 'chokidar' --ignore 'fs' --ignore 'glob' --ignore 'path' --ignore 'supports-color' --ignore './lib/buffered-runner.js' --ignore './lib/serializer.js' --ignore './lib/reporters/buffered.js' --ignore './lib/worker.js' --ignore './lib/pool.js' -o mocha.js`,
description: 'Build browser bundle'
},
lint: {
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"ms": "2.1.2",
"object.assign": "4.1.0",
"promise.allsettled": "1.0.2",
"serialize-javascript": "3.0.0",
"strip-json-comments": "3.0.1",
"supports-color": "7.1.0",
"which": "2.0.2",
Expand Down Expand Up @@ -164,7 +165,8 @@
"./lib/serializer.js": false,
"./lib/reporters/buffered.js": false,
"./lib/buffered-reporter.js": false,
"./lib/worker.js": false
"./lib/worker.js": false,
"./lib/pool.js": false
},
"prettier": {
"singleQuote": true,
Expand Down
Loading

0 comments on commit 331fca9

Please sign in to comment.