123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- /**
- * Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
- *
- * This source code is licensed under the MIT license found in the
- * LICENSE file in the root directory of this source tree.
- *
- *
- */
- 'use strict';
- Object.defineProperty(exports, '__esModule', {
- value: true
- });
- var _child_process;
- function _load_child_process() {
- return (_child_process = _interopRequireDefault(require('child_process')));
- }
- var _types;
- function _load_types() {
- return (_types = require('./types'));
- }
- function _interopRequireDefault(obj) {
- return obj && obj.__esModule ? obj : {default: obj};
- }
- /**
- * This class wraps the child process and provides a nice interface to
- * communicate with. It takes care of:
- *
- * - Re-spawning the process if it dies.
- * - Queues calls while the worker is busy.
- * - Re-sends the requests if the worker blew up.
- *
- * The reason for queueing them here (since childProcess.send also has an
- * internal queue) is because the worker could be doing asynchronous work, and
- * this would lead to the child process to read its receiving buffer and start a
- * second call. By queueing calls here, we don't send the next call to the
- * children until we receive the result of the previous one.
- *
- * As soon as a request starts to be processed by a worker, its "processed"
- * field is changed to "true", so that other workers which might encounter the
- * same call skip it.
- */
- exports.default = class {
- constructor(options) {
- this._options = options;
- this._queue = null;
- this._initialize();
- }
- getStdout() {
- return this._child.stdout;
- }
- getStderr() {
- return this._child.stderr;
- }
- send(request, onProcessStart, onProcessEnd) {
- const item = {next: null, onProcessEnd, onProcessStart, request};
- if (this._last) {
- this._last.next = item;
- } else {
- this._queue = item;
- }
- this._last = item;
- this._process();
- }
- _initialize() {
- const child = (_child_process || _load_child_process()).default.fork(
- require.resolve('./child'),
- // $FlowFixMe: Flow does not work well with Object.assign.
- Object.assign(
- {
- cwd: process.cwd(),
- env: Object.assign({}, process.env, {
- JEST_WORKER_ID: this._options.workerId
- }),
- // Suppress --debug / --inspect flags while preserving others (like --harmony).
- execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)),
- silent: true
- },
- this._options.forkOptions
- )
- );
- child.on('message', this._receive.bind(this));
- child.on('exit', this._exit.bind(this));
- // $FlowFixMe: wrong "ChildProcess.send" signature.
- child.send([
- (_types || _load_types()).CHILD_MESSAGE_INITIALIZE,
- false,
- this._options.workerPath
- ]);
- this._retries++;
- this._child = child;
- this._busy = false;
- // If we exceeded the amount of retries, we will emulate an error reply
- // coming from the child. This avoids code duplication related with cleaning
- // the queue, and scheduling the next call.
- if (this._retries > this._options.maxRetries) {
- const error = new Error('Call retries were exceeded');
- this._receive([
- (_types || _load_types()).PARENT_MESSAGE_ERROR,
- error.name,
- error.message,
- error.stack,
- {type: 'WorkerError'}
- ]);
- }
- }
- _process() {
- if (this._busy) {
- return;
- }
- let item = this._queue;
- // Calls in the queue might have already been processed by another worker,
- // so we have to skip them.
- while (item && item.request[1]) {
- item = item.next;
- }
- this._queue = item;
- if (item) {
- // Flag the call as processed, so that other workers know that they don't
- // have to process it as well.
- item.request[1] = true;
- // Tell the parent that this item is starting to be processed.
- item.onProcessStart(this);
- this._retries = 0;
- this._busy = true;
- // $FlowFixMe: wrong "ChildProcess.send" signature.
- this._child.send(item.request);
- } else {
- this._last = item;
- }
- }
- _receive(response /* Should be ParentMessage */) {
- const item = this._queue;
- if (!item) {
- throw new TypeError('Unexpected response with an empty queue');
- }
- const onProcessEnd = item.onProcessEnd;
- this._busy = false;
- this._process();
- switch (response[0]) {
- case (_types || _load_types()).PARENT_MESSAGE_OK:
- onProcessEnd(null, response[1]);
- break;
- case (_types || _load_types()).PARENT_MESSAGE_ERROR:
- let error = response[4];
- if (error != null && typeof error === 'object') {
- const extra = error;
- const NativeCtor = global[response[1]];
- const Ctor = typeof NativeCtor === 'function' ? NativeCtor : Error;
- error = new Ctor(response[2]);
- // $FlowFixMe: adding custom properties to errors.
- error.type = response[1];
- error.stack = response[3];
- for (const key in extra) {
- // $FlowFixMe: adding custom properties to errors.
- error[key] = extra[key];
- }
- }
- onProcessEnd(error, null);
- break;
- default:
- throw new TypeError('Unexpected response from worker: ' + response[0]);
- }
- }
- _exit(exitCode) {
- if (exitCode !== 0) {
- this._initialize();
- }
- }
- };
|