node/lib/internal/main/worker_thread.js
Joyee Cheung f14d2e5a0c
loader: use default loader as cascaded loader in the in loader worker
Use the default loader as the cascaded loader in the loader worker.
Otherwise we spawn loader workers in the loader workers indefinitely.

PR-URL: https://github.com/nodejs/node/pull/47620
Fixes: https://github.com/nodejs/node/issues/47566
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Jacob Smith <jacob@frende.me>
Reviewed-By: Geoffrey Booth <webadmin@geoffreybooth.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
2023-04-21 12:27:54 +00:00

264 lines
7.1 KiB
JavaScript

'use strict';
// In worker threads, execute the script sent through the
// message port.
const {
ArrayPrototypeForEach,
ArrayPrototypePushApply,
ArrayPrototypeSplice,
ObjectDefineProperty,
PromisePrototypeThen,
RegExpPrototypeExec,
SafeWeakMap,
globalThis: {
Atomics,
SharedArrayBuffer,
},
} = primordials;
const {
prepareWorkerThreadExecution,
setupUserModules,
markBootstrapComplete,
} = require('internal/process/pre_execution');
const {
threadId,
getEnvMessagePort,
} = internalBinding('worker');
const workerIo = require('internal/worker/io');
const {
messageTypes: {
// Messages that may be received by workers
LOAD_SCRIPT,
// Messages that may be posted from workers
UP_AND_RUNNING,
ERROR_MESSAGE,
COULD_NOT_SERIALIZE_ERROR,
// Messages that may be either received or posted
STDIO_PAYLOAD,
STDIO_WANTS_MORE_DATA,
},
kStdioWantsMoreDataCallback,
} = workerIo;
const {
onGlobalUncaughtException,
} = require('internal/process/execution');
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
debug = fn;
});
const assert = require('internal/assert');
const { exitCodes: { kGenericUserError } } = internalBinding('errors');
prepareWorkerThreadExecution();
debug(`[${threadId}] is setting up worker child environment`);
// Set up the message port and start listening
const port = getEnvMessagePort();
// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably
// spawned by our child_process module. In the work threads, mark the
// related IPC properties as unavailable.
if (process.env.NODE_CHANNEL_FD) {
const workerThreadSetup = require('internal/process/worker_thread_only');
ObjectDefineProperty(process, 'channel', {
__proto__: null,
enumerable: false,
get: workerThreadSetup.unavailable('process.channel'),
});
ObjectDefineProperty(process, 'connected', {
__proto__: null,
enumerable: false,
get: workerThreadSetup.unavailable('process.connected'),
});
process.send = workerThreadSetup.unavailable('process.send()');
process.disconnect =
workerThreadSetup.unavailable('process.disconnect()');
}
port.on('message', (message) => {
if (message.type === LOAD_SCRIPT) {
port.unref();
const {
argv,
cwdCounter,
doEval,
environmentData,
filename,
hasStdin,
manifestSrc,
manifestURL,
publicPort,
workerData,
} = message;
if (doEval !== 'internal') {
if (argv !== undefined) {
ArrayPrototypePushApply(process.argv, argv);
}
const publicWorker = require('worker_threads');
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;
}
require('internal/worker').assignEnvironmentData(environmentData);
if (SharedArrayBuffer !== undefined && Atomics !== undefined) {
// The counter is only passed to the workers created by the main thread,
// not to workers created by other workers.
let cachedCwd = '';
let lastCounter = -1;
const originalCwd = process.cwd;
process.cwd = function() {
const currentCounter = Atomics.load(cwdCounter, 0);
if (currentCounter === lastCounter)
return cachedCwd;
lastCounter = currentCounter;
cachedCwd = originalCwd();
return cachedCwd;
};
workerIo.sharedCwdCounter = cwdCounter;
}
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
const isLoaderWorker =
doEval === 'internal' &&
filename === require('internal/modules/esm/utils').loaderWorkerId;
setupUserModules(isLoaderWorker);
if (!hasStdin)
process.stdin.push(null);
debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${doEval}) at cwd = ${process.cwd()}`);
port.postMessage({ type: UP_AND_RUNNING });
switch (doEval) {
case 'internal': {
// Create this WeakMap in js-land because V8 has no C++ API for WeakMap.
internalBinding('module_wrap').callbackMap = new SafeWeakMap();
require(filename)(workerData, publicPort);
break;
}
case 'classic': {
const { evalScript } = require('internal/process/execution');
const name = '[worker eval]';
// This is necessary for CJS module compilation.
// TODO: pass this with something really internal.
ObjectDefineProperty(process, '_eval', {
__proto__: null,
configurable: true,
enumerable: true,
value: filename,
});
ArrayPrototypeSplice(process.argv, 1, 0, name);
evalScript(name, filename);
break;
}
case 'module': {
const { evalModule } = require('internal/process/execution');
PromisePrototypeThen(evalModule(filename), undefined, (e) => {
workerOnGlobalUncaughtException(e, true);
});
break;
}
default: {
// script filename
// runMain here might be monkey-patched by users in --require.
// XXX: the monkey-patchability here should probably be deprecated.
ArrayPrototypeSplice(process.argv, 1, 0, filename);
const CJSLoader = require('internal/modules/cjs/loader');
CJSLoader.Module.runMain(filename);
break;
}
}
} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunks } = message;
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
process[stream].push(chunk, encoding);
});
} else {
assert(
message.type === STDIO_WANTS_MORE_DATA,
`Unknown worker message type ${message.type}`,
);
const { stream } = message;
process[stream][kStdioWantsMoreDataCallback]();
}
});
function workerOnGlobalUncaughtException(error, fromPromise) {
debug(`[${threadId}] gets uncaught exception`);
let handled = false;
let handlerThrew = false;
try {
handled = onGlobalUncaughtException(error, fromPromise);
} catch (e) {
error = e;
handlerThrew = true;
}
debug(`[${threadId}] uncaught exception handled = ${handled}`);
if (handled) {
return true;
}
if (!process._exiting) {
try {
process._exiting = true;
process.exitCode = kGenericUserError;
if (!handlerThrew) {
process.emit('exit', process.exitCode);
}
} catch {
// Continue regardless of error.
}
}
let serialized;
try {
const { serializeError } = require('internal/error_serdes');
serialized = serializeError(error);
} catch {
// Continue regardless of error.
}
debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: ERROR_MESSAGE,
error: serialized,
});
else
port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });
const { clearAsyncIdStack } = require('internal/async_hooks');
clearAsyncIdStack();
process.exit();
}
// Patch the global uncaught exception handler so it gets picked up by
// node::errors::TriggerUncaughtException().
process._fatalException = workerOnGlobalUncaughtException;
markBootstrapComplete();
// Necessary to reset RegExp statics before user code runs.
RegExpPrototypeExec(/^/, '');
port.start();