mirror of
https://github.com/nodejs/node.git
synced 2025-05-06 23:10:15 +00:00
child_process,cluster: allow using V8 serialization API
Add an `serialization` option that allows child process IPC to use the (typically more powerful) V8 serialization API. Fixes: https://github.com/nodejs/node/issues/10965 PR-URL: https://github.com/nodejs/node/pull/30162 Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: David Carlier <devnexen@gmail.com> Reviewed-By: Michaël Zasso <targos@protonmail.com>
This commit is contained in:
parent
f17e414dc4
commit
973f324463
@ -7,16 +7,25 @@ if (cluster.isMaster) {
|
|||||||
workers: [1],
|
workers: [1],
|
||||||
payload: ['string', 'object'],
|
payload: ['string', 'object'],
|
||||||
sendsPerBroadcast: [1, 10],
|
sendsPerBroadcast: [1, 10],
|
||||||
|
serialization: ['json', 'advanced'],
|
||||||
n: [1e5]
|
n: [1e5]
|
||||||
});
|
});
|
||||||
|
|
||||||
function main({ n, workers, sendsPerBroadcast, payload }) {
|
function main({
|
||||||
|
n,
|
||||||
|
workers,
|
||||||
|
sendsPerBroadcast,
|
||||||
|
payload,
|
||||||
|
serialization
|
||||||
|
}) {
|
||||||
const expectedPerBroadcast = sendsPerBroadcast * workers;
|
const expectedPerBroadcast = sendsPerBroadcast * workers;
|
||||||
var readies = 0;
|
var readies = 0;
|
||||||
var broadcasts = 0;
|
var broadcasts = 0;
|
||||||
var msgCount = 0;
|
var msgCount = 0;
|
||||||
var data;
|
var data;
|
||||||
|
|
||||||
|
cluster.settings.serialization = serialization;
|
||||||
|
|
||||||
switch (payload) {
|
switch (payload) {
|
||||||
case 'string':
|
case 'string':
|
||||||
data = 'hello world!';
|
data = 'hello world!';
|
||||||
|
@ -321,6 +321,9 @@ arbitrary command execution.**
|
|||||||
<!-- YAML
|
<!-- YAML
|
||||||
added: v0.5.0
|
added: v0.5.0
|
||||||
changes:
|
changes:
|
||||||
|
- version: REPLACEME
|
||||||
|
pr-url: https://github.com/nodejs/node/pull/30162
|
||||||
|
description: The `serialization` option is supported now.
|
||||||
- version: v8.0.0
|
- version: v8.0.0
|
||||||
pr-url: https://github.com/nodejs/node/pull/10866
|
pr-url: https://github.com/nodejs/node/pull/10866
|
||||||
description: The `stdio` option can now be a string.
|
description: The `stdio` option can now be a string.
|
||||||
@ -340,6 +343,9 @@ changes:
|
|||||||
* `execPath` {string} Executable used to create the child process.
|
* `execPath` {string} Executable used to create the child process.
|
||||||
* `execArgv` {string[]} List of string arguments passed to the executable.
|
* `execArgv` {string[]} List of string arguments passed to the executable.
|
||||||
**Default:** `process.execArgv`.
|
**Default:** `process.execArgv`.
|
||||||
|
* `serialization` {string} Specify the kind of serialization used for sending
|
||||||
|
messages between processes. Possible values are `'json'` and `'advanced'`.
|
||||||
|
See [Advanced Serialization][] for more details. **Default:** `'json'`.
|
||||||
* `silent` {boolean} If `true`, stdin, stdout, and stderr of the child will be
|
* `silent` {boolean} If `true`, stdin, stdout, and stderr of the child will be
|
||||||
piped to the parent, otherwise they will be inherited from the parent, see
|
piped to the parent, otherwise they will be inherited from the parent, see
|
||||||
the `'pipe'` and `'inherit'` options for [`child_process.spawn()`][]'s
|
the `'pipe'` and `'inherit'` options for [`child_process.spawn()`][]'s
|
||||||
@ -386,6 +392,9 @@ The `shell` option available in [`child_process.spawn()`][] is not supported by
|
|||||||
<!-- YAML
|
<!-- YAML
|
||||||
added: v0.1.90
|
added: v0.1.90
|
||||||
changes:
|
changes:
|
||||||
|
- version: REPLACEME
|
||||||
|
pr-url: https://github.com/nodejs/node/pull/30162
|
||||||
|
description: The `serialization` option is supported now.
|
||||||
- version: v8.8.0
|
- version: v8.8.0
|
||||||
pr-url: https://github.com/nodejs/node/pull/15380
|
pr-url: https://github.com/nodejs/node/pull/15380
|
||||||
description: The `windowsHide` option is supported now.
|
description: The `windowsHide` option is supported now.
|
||||||
@ -411,6 +420,9 @@ changes:
|
|||||||
[`options.detached`][]).
|
[`options.detached`][]).
|
||||||
* `uid` {number} Sets the user identity of the process (see setuid(2)).
|
* `uid` {number} Sets the user identity of the process (see setuid(2)).
|
||||||
* `gid` {number} Sets the group identity of the process (see setgid(2)).
|
* `gid` {number} Sets the group identity of the process (see setgid(2)).
|
||||||
|
* `serialization` {string} Specify the kind of serialization used for sending
|
||||||
|
messages between processes. Possible values are `'json'` and `'advanced'`.
|
||||||
|
See [Advanced Serialization][] for more details. **Default:** `'json'`.
|
||||||
* `shell` {boolean|string} If `true`, runs `command` inside of a shell. Uses
|
* `shell` {boolean|string} If `true`, runs `command` inside of a shell. Uses
|
||||||
`'/bin/sh'` on Unix, and `process.env.ComSpec` on Windows. A different
|
`'/bin/sh'` on Unix, and `process.env.ComSpec` on Windows. A different
|
||||||
shell can be specified as a string. See [Shell Requirements][] and
|
shell can be specified as a string. See [Shell Requirements][] and
|
||||||
@ -998,6 +1010,11 @@ The `'message'` event is triggered when a child process uses
|
|||||||
The message goes through serialization and parsing. The resulting
|
The message goes through serialization and parsing. The resulting
|
||||||
message might not be the same as what is originally sent.
|
message might not be the same as what is originally sent.
|
||||||
|
|
||||||
|
If the `serialization` option was set to `'advanced'` used when spawning the
|
||||||
|
child process, the `message` argument can contain data that JSON is not able
|
||||||
|
to represent.
|
||||||
|
See [Advanced Serialization][] for more details.
|
||||||
|
|
||||||
### subprocess.channel
|
### subprocess.channel
|
||||||
<!-- YAML
|
<!-- YAML
|
||||||
added: v7.1.0
|
added: v7.1.0
|
||||||
@ -1472,6 +1489,26 @@ the same requirement. Thus, in `child_process` functions where a shell can be
|
|||||||
spawned, `'cmd.exe'` is used as a fallback if `process.env.ComSpec` is
|
spawned, `'cmd.exe'` is used as a fallback if `process.env.ComSpec` is
|
||||||
unavailable.
|
unavailable.
|
||||||
|
|
||||||
|
## Advanced Serialization
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
Child processes support a serialization mechanism for IPC that is based on the
|
||||||
|
[serialization API of the `v8` module][v8.serdes], based on the
|
||||||
|
[HTML structured clone algorithm][]. This is generally more powerful and
|
||||||
|
supports more built-in JavaScript object types, such as `BigInt`, `Map`
|
||||||
|
and `Set`, `ArrayBuffer` and `TypedArray`, `Buffer`, `Error`, `RegExp` etc.
|
||||||
|
|
||||||
|
However, this format is not a full superset of JSON, and e.g. properties set on
|
||||||
|
objects of such built-in types will not be passed on through the serialization
|
||||||
|
step. Additionally, performance may not be equivalent to that of JSON, depending
|
||||||
|
on the structure of the passed data.
|
||||||
|
Therefore, this feature requires opting in by setting the
|
||||||
|
`serialization` option to `'advanced'` when calling [`child_process.spawn()`][]
|
||||||
|
or [`child_process.fork()`][].
|
||||||
|
|
||||||
|
[Advanced Serialization]: #child_process_advanced_serialization
|
||||||
[`'disconnect'`]: process.html#process_event_disconnect
|
[`'disconnect'`]: process.html#process_event_disconnect
|
||||||
[`'error'`]: #child_process_event_error
|
[`'error'`]: #child_process_event_error
|
||||||
[`'exit'`]: #child_process_event_exit
|
[`'exit'`]: #child_process_event_exit
|
||||||
@ -1505,5 +1542,7 @@ unavailable.
|
|||||||
[`subprocess.stdout`]: #child_process_subprocess_stdout
|
[`subprocess.stdout`]: #child_process_subprocess_stdout
|
||||||
[`util.promisify()`]: util.html#util_util_promisify_original
|
[`util.promisify()`]: util.html#util_util_promisify_original
|
||||||
[Default Windows Shell]: #child_process_default_windows_shell
|
[Default Windows Shell]: #child_process_default_windows_shell
|
||||||
|
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
|
||||||
[Shell Requirements]: #child_process_shell_requirements
|
[Shell Requirements]: #child_process_shell_requirements
|
||||||
[synchronous counterparts]: #child_process_synchronous_process_creation
|
[synchronous counterparts]: #child_process_synchronous_process_creation
|
||||||
|
[v8.serdes]: v8.html#v8_serialization_api
|
||||||
|
@ -724,6 +724,9 @@ values are `'rr'` and `'none'`.
|
|||||||
<!-- YAML
|
<!-- YAML
|
||||||
added: v0.7.1
|
added: v0.7.1
|
||||||
changes:
|
changes:
|
||||||
|
- version: REPLACEME
|
||||||
|
pr-url: https://github.com/nodejs/node/pull/30162
|
||||||
|
description: The `serialization` option is supported now.
|
||||||
- version: v9.5.0
|
- version: v9.5.0
|
||||||
pr-url: https://github.com/nodejs/node/pull/18399
|
pr-url: https://github.com/nodejs/node/pull/18399
|
||||||
description: The `cwd` option is supported now.
|
description: The `cwd` option is supported now.
|
||||||
@ -746,6 +749,10 @@ changes:
|
|||||||
**Default:** `process.argv.slice(2)`.
|
**Default:** `process.argv.slice(2)`.
|
||||||
* `cwd` {string} Current working directory of the worker process. **Default:**
|
* `cwd` {string} Current working directory of the worker process. **Default:**
|
||||||
`undefined` (inherits from parent process).
|
`undefined` (inherits from parent process).
|
||||||
|
* `serialization` {string} Specify the kind of serialization used for sending
|
||||||
|
messages between processes. Possible values are `'json'` and `'advanced'`.
|
||||||
|
See [Advanced Serialization for `child_process`][] for more details.
|
||||||
|
**Default:** `false`.
|
||||||
* `silent` {boolean} Whether or not to send output to parent's stdio.
|
* `silent` {boolean} Whether or not to send output to parent's stdio.
|
||||||
**Default:** `false`.
|
**Default:** `false`.
|
||||||
* `stdio` {Array} Configures the stdio of forked processes. Because the
|
* `stdio` {Array} Configures the stdio of forked processes. Because the
|
||||||
@ -874,4 +881,5 @@ socket.on('data', (id) => {
|
|||||||
[`process` event: `'message'`]: process.html#process_event_message
|
[`process` event: `'message'`]: process.html#process_event_message
|
||||||
[`server.close()`]: net.html#net_event_close
|
[`server.close()`]: net.html#net_event_close
|
||||||
[`worker.exitedAfterDisconnect`]: #cluster_worker_exitedafterdisconnect
|
[`worker.exitedAfterDisconnect`]: #cluster_worker_exitedafterdisconnect
|
||||||
|
[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
|
||||||
[Child Process module]: child_process.html#child_process_child_process_fork_modulepath_args_options
|
[Child Process module]: child_process.html#child_process_child_process_fork_modulepath_args_options
|
||||||
|
@ -119,6 +119,11 @@ the child process.
|
|||||||
The message goes through serialization and parsing. The resulting message might
|
The message goes through serialization and parsing. The resulting message might
|
||||||
not be the same as what is originally sent.
|
not be the same as what is originally sent.
|
||||||
|
|
||||||
|
If the `serialization` option was set to `advanced` used when spawning the
|
||||||
|
process, the `message` argument can contain data that JSON is not able
|
||||||
|
to represent.
|
||||||
|
See [Advanced Serialization for `child_process`][] for more details.
|
||||||
|
|
||||||
### Event: 'multipleResolves'
|
### Event: 'multipleResolves'
|
||||||
<!-- YAML
|
<!-- YAML
|
||||||
added: v10.12.0
|
added: v10.12.0
|
||||||
@ -2456,6 +2461,7 @@ cases:
|
|||||||
[`require.resolve()`]: modules.html#modules_require_resolve_request_options
|
[`require.resolve()`]: modules.html#modules_require_resolve_request_options
|
||||||
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
|
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
|
||||||
[`v8.setFlagsFromString()`]: v8.html#v8_v8_setflagsfromstring_flags
|
[`v8.setFlagsFromString()`]: v8.html#v8_v8_setflagsfromstring_flags
|
||||||
|
[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
|
||||||
[Android building]: https://github.com/nodejs/node/blob/master/BUILDING.md#androidandroid-based-devices-eg-firefox-os
|
[Android building]: https://github.com/nodejs/node/blob/master/BUILDING.md#androidandroid-based-devices-eg-firefox-os
|
||||||
[Child Process]: child_process.html
|
[Child Process]: child_process.html
|
||||||
[Cluster]: cluster.html
|
[Cluster]: cluster.html
|
||||||
|
@ -108,12 +108,12 @@ function fork(modulePath /* , args, options */) {
|
|||||||
return spawn(options.execPath, args, options);
|
return spawn(options.execPath, args, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
function _forkChild(fd) {
|
function _forkChild(fd, serializationMode) {
|
||||||
// set process.send()
|
// set process.send()
|
||||||
const p = new Pipe(PipeConstants.IPC);
|
const p = new Pipe(PipeConstants.IPC);
|
||||||
p.open(fd);
|
p.open(fd);
|
||||||
p.unref();
|
p.unref();
|
||||||
const control = setupChannel(process, p);
|
const control = setupChannel(process, p, serializationMode);
|
||||||
process.on('newListener', function onNewListener(name) {
|
process.on('newListener', function onNewListener(name) {
|
||||||
if (name === 'message' || name === 'disconnect') control.ref();
|
if (name === 'message' || name === 'disconnect') control.ref();
|
||||||
});
|
});
|
||||||
|
@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() {
|
|||||||
// Make sure it's not accidentally inherited by child processes.
|
// Make sure it's not accidentally inherited by child processes.
|
||||||
delete process.env.NODE_CHANNEL_FD;
|
delete process.env.NODE_CHANNEL_FD;
|
||||||
|
|
||||||
require('child_process')._forkChild(fd);
|
const serializationMode =
|
||||||
|
process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json';
|
||||||
|
delete process.env.NODE_CHANNEL_SERIALIZATION_MODE;
|
||||||
|
|
||||||
|
require('child_process')._forkChild(fd, serializationMode);
|
||||||
assert(process.send);
|
assert(process.send);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
const { JSON, Object } = primordials;
|
const { Object } = primordials;
|
||||||
|
|
||||||
const {
|
const {
|
||||||
errnoException,
|
errnoException,
|
||||||
@ -55,8 +55,6 @@ const {
|
|||||||
|
|
||||||
const { SocketListSend, SocketListReceive } = SocketList;
|
const { SocketListSend, SocketListReceive } = SocketList;
|
||||||
|
|
||||||
// Lazy loaded for startup performance.
|
|
||||||
let StringDecoder;
|
|
||||||
// Lazy loaded for startup performance and to allow monkey patching of
|
// Lazy loaded for startup performance and to allow monkey patching of
|
||||||
// internalBinding('http_parser').HTTPParser.
|
// internalBinding('http_parser').HTTPParser.
|
||||||
let freeParser;
|
let freeParser;
|
||||||
@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) {
|
|||||||
const ipcFd = stdio.ipcFd;
|
const ipcFd = stdio.ipcFd;
|
||||||
stdio = options.stdio = stdio.stdio;
|
stdio = options.stdio = stdio.stdio;
|
||||||
|
|
||||||
|
if (options.serialization !== undefined &&
|
||||||
|
options.serialization !== 'json' &&
|
||||||
|
options.serialization !== 'advanced') {
|
||||||
|
throw new ERR_INVALID_OPT_VALUE('options.serialization',
|
||||||
|
options.serialization);
|
||||||
|
}
|
||||||
|
|
||||||
|
const serialization = options.serialization || 'json';
|
||||||
|
|
||||||
if (ipc !== undefined) {
|
if (ipc !== undefined) {
|
||||||
// Let child process know about opened IPC channel
|
// Let child process know about opened IPC channel
|
||||||
if (options.envPairs === undefined)
|
if (options.envPairs === undefined)
|
||||||
@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) {
|
|||||||
options.envPairs);
|
options.envPairs);
|
||||||
}
|
}
|
||||||
|
|
||||||
options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
|
options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
|
||||||
|
options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
validateString(options.file, 'options.file');
|
validateString(options.file, 'options.file');
|
||||||
@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) {
|
|||||||
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
|
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
|
||||||
|
|
||||||
// Add .send() method and start listening for IPC data
|
// Add .send() method and start listening for IPC data
|
||||||
if (ipc !== undefined) setupChannel(this, ipc);
|
if (ipc !== undefined) setupChannel(this, ipc, serialization);
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
};
|
};
|
||||||
@ -516,7 +524,8 @@ class Control extends EventEmitter {
|
|||||||
const channelDeprecationMsg = '_channel is deprecated. ' +
|
const channelDeprecationMsg = '_channel is deprecated. ' +
|
||||||
'Use ChildProcess.channel instead.';
|
'Use ChildProcess.channel instead.';
|
||||||
|
|
||||||
function setupChannel(target, channel) {
|
let serialization;
|
||||||
|
function setupChannel(target, channel, serializationMode) {
|
||||||
target.channel = channel;
|
target.channel = channel;
|
||||||
|
|
||||||
Object.defineProperty(target, '_channel', {
|
Object.defineProperty(target, '_channel', {
|
||||||
@ -535,12 +544,16 @@ function setupChannel(target, channel) {
|
|||||||
|
|
||||||
const control = new Control(channel);
|
const control = new Control(channel);
|
||||||
|
|
||||||
if (StringDecoder === undefined)
|
if (serialization === undefined)
|
||||||
StringDecoder = require('string_decoder').StringDecoder;
|
serialization = require('internal/child_process/serialization');
|
||||||
const decoder = new StringDecoder('utf8');
|
const {
|
||||||
var jsonBuffer = '';
|
initMessageChannel,
|
||||||
var pendingHandle = null;
|
parseChannelMessages,
|
||||||
channel.buffering = false;
|
writeChannelMessage
|
||||||
|
} = serialization[serializationMode];
|
||||||
|
|
||||||
|
let pendingHandle = null;
|
||||||
|
initMessageChannel(channel);
|
||||||
channel.pendingHandle = null;
|
channel.pendingHandle = null;
|
||||||
channel.onread = function(arrayBuffer) {
|
channel.onread = function(arrayBuffer) {
|
||||||
const recvHandle = channel.pendingHandle;
|
const recvHandle = channel.pendingHandle;
|
||||||
@ -552,21 +565,7 @@ function setupChannel(target, channel) {
|
|||||||
if (recvHandle)
|
if (recvHandle)
|
||||||
pendingHandle = recvHandle;
|
pendingHandle = recvHandle;
|
||||||
|
|
||||||
// Linebreak is used as a message end sign
|
for (const message of parseChannelMessages(channel, pool)) {
|
||||||
var chunks = decoder.write(pool).split('\n');
|
|
||||||
var numCompleteChunks = chunks.length - 1;
|
|
||||||
// Last line does not have trailing linebreak
|
|
||||||
var incompleteChunk = chunks[numCompleteChunks];
|
|
||||||
if (numCompleteChunks === 0) {
|
|
||||||
jsonBuffer += incompleteChunk;
|
|
||||||
this.buffering = jsonBuffer.length !== 0;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
chunks[0] = jsonBuffer + chunks[0];
|
|
||||||
|
|
||||||
for (var i = 0; i < numCompleteChunks; i++) {
|
|
||||||
var message = JSON.parse(chunks[i]);
|
|
||||||
|
|
||||||
// There will be at most one NODE_HANDLE message in every chunk we
|
// There will be at most one NODE_HANDLE message in every chunk we
|
||||||
// read because SCM_RIGHTS messages don't get coalesced. Make sure
|
// read because SCM_RIGHTS messages don't get coalesced. Make sure
|
||||||
// that we deliver the handle with the right message however.
|
// that we deliver the handle with the right message however.
|
||||||
@ -581,9 +580,6 @@ function setupChannel(target, channel) {
|
|||||||
handleMessage(message, undefined, false);
|
handleMessage(message, undefined, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
jsonBuffer = incompleteChunk;
|
|
||||||
this.buffering = jsonBuffer.length !== 0;
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
this.buffering = false;
|
this.buffering = false;
|
||||||
target.disconnect();
|
target.disconnect();
|
||||||
@ -782,8 +778,7 @@ function setupChannel(target, channel) {
|
|||||||
|
|
||||||
const req = new WriteWrap();
|
const req = new WriteWrap();
|
||||||
|
|
||||||
const string = JSON.stringify(message) + '\n';
|
const err = writeChannelMessage(channel, req, message, handle);
|
||||||
const err = channel.writeUtf8String(req, string, handle);
|
|
||||||
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
|
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
|
||||||
|
|
||||||
if (err === 0) {
|
if (err === 0) {
|
||||||
|
119
lib/internal/child_process/serialization.js
Normal file
119
lib/internal/child_process/serialization.js
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const { JSON } = primordials;
|
||||||
|
const { Buffer } = require('buffer');
|
||||||
|
const { StringDecoder } = require('string_decoder');
|
||||||
|
const v8 = require('v8');
|
||||||
|
const { isArrayBufferView } = require('internal/util/types');
|
||||||
|
const assert = require('internal/assert');
|
||||||
|
|
||||||
|
const kMessageBuffer = Symbol('kMessageBuffer');
|
||||||
|
const kJSONBuffer = Symbol('kJSONBuffer');
|
||||||
|
const kStringDecoder = Symbol('kStringDecoder');
|
||||||
|
|
||||||
|
// Extend V8's serializer APIs to give more JSON-like behaviour in
|
||||||
|
// some cases; in particular, for native objects this serializes them the same
|
||||||
|
// way that JSON does rather than throwing an exception.
|
||||||
|
const kArrayBufferViewTag = 0;
|
||||||
|
const kNotArrayBufferViewTag = 1;
|
||||||
|
class ChildProcessSerializer extends v8.DefaultSerializer {
|
||||||
|
_writeHostObject(object) {
|
||||||
|
if (isArrayBufferView(object)) {
|
||||||
|
this.writeUint32(kArrayBufferViewTag);
|
||||||
|
return super._writeHostObject(object);
|
||||||
|
} else {
|
||||||
|
this.writeUint32(kNotArrayBufferViewTag);
|
||||||
|
this.writeValue({ ...object });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ChildProcessDeserializer extends v8.DefaultDeserializer {
|
||||||
|
_readHostObject() {
|
||||||
|
const tag = this.readUint32();
|
||||||
|
if (tag === kArrayBufferViewTag)
|
||||||
|
return super._readHostObject();
|
||||||
|
|
||||||
|
assert(tag === kNotArrayBufferViewTag);
|
||||||
|
return this.readValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Messages are parsed in either of the following formats:
|
||||||
|
// - Newline-delimited JSON, or
|
||||||
|
// - V8-serialized buffers, prefixed with their length as a big endian uint32
|
||||||
|
// (aka 'advanced')
|
||||||
|
const advanced = {
|
||||||
|
initMessageChannel(channel) {
|
||||||
|
channel[kMessageBuffer] = Buffer.alloc(0);
|
||||||
|
channel.buffering = false;
|
||||||
|
},
|
||||||
|
|
||||||
|
*parseChannelMessages(channel, readData) {
|
||||||
|
if (readData.length === 0) return;
|
||||||
|
|
||||||
|
let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
|
||||||
|
while (messageBuffer.length > 4) {
|
||||||
|
const size = messageBuffer.readUInt32BE();
|
||||||
|
if (messageBuffer.length < 4 + size) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deserializer = new ChildProcessDeserializer(
|
||||||
|
messageBuffer.subarray(4, 4 + size));
|
||||||
|
messageBuffer = messageBuffer.subarray(4 + size);
|
||||||
|
|
||||||
|
deserializer.readHeader();
|
||||||
|
yield deserializer.readValue();
|
||||||
|
}
|
||||||
|
channel[kMessageBuffer] = messageBuffer;
|
||||||
|
channel.buffering = messageBuffer.length > 0;
|
||||||
|
},
|
||||||
|
|
||||||
|
writeChannelMessage(channel, req, message, handle) {
|
||||||
|
const ser = new ChildProcessSerializer();
|
||||||
|
ser.writeHeader();
|
||||||
|
ser.writeValue(message);
|
||||||
|
const serializedMessage = ser.releaseBuffer();
|
||||||
|
const sizeBuffer = Buffer.allocUnsafe(4);
|
||||||
|
sizeBuffer.writeUInt32BE(serializedMessage.length);
|
||||||
|
return channel.writeBuffer(req, Buffer.concat([
|
||||||
|
sizeBuffer,
|
||||||
|
serializedMessage
|
||||||
|
]), handle);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const json = {
|
||||||
|
initMessageChannel(channel) {
|
||||||
|
channel[kJSONBuffer] = '';
|
||||||
|
channel[kStringDecoder] = undefined;
|
||||||
|
},
|
||||||
|
|
||||||
|
*parseChannelMessages(channel, readData) {
|
||||||
|
if (readData.length === 0) return;
|
||||||
|
|
||||||
|
if (channel[kStringDecoder] === undefined)
|
||||||
|
channel[kStringDecoder] = new StringDecoder('utf8');
|
||||||
|
const chunks = channel[kStringDecoder].write(readData).split('\n');
|
||||||
|
const numCompleteChunks = chunks.length - 1;
|
||||||
|
// Last line does not have trailing linebreak
|
||||||
|
const incompleteChunk = chunks[numCompleteChunks];
|
||||||
|
if (numCompleteChunks === 0) {
|
||||||
|
channel[kJSONBuffer] += incompleteChunk;
|
||||||
|
} else {
|
||||||
|
chunks[0] = channel[kJSONBuffer] + chunks[0];
|
||||||
|
for (let i = 0; i < numCompleteChunks; i++)
|
||||||
|
yield JSON.parse(chunks[i]);
|
||||||
|
channel[kJSONBuffer] = incompleteChunk;
|
||||||
|
}
|
||||||
|
channel.buffering = channel[kJSONBuffer].length !== 0;
|
||||||
|
},
|
||||||
|
|
||||||
|
writeChannelMessage(channel, req, message, handle) {
|
||||||
|
const string = JSON.stringify(message) + '\n';
|
||||||
|
return channel.writeUtf8String(req, string, handle);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = { advanced, json };
|
@ -130,6 +130,7 @@ function createWorkerProcess(id, env) {
|
|||||||
return fork(cluster.settings.exec, cluster.settings.args, {
|
return fork(cluster.settings.exec, cluster.settings.args, {
|
||||||
cwd: cluster.settings.cwd,
|
cwd: cluster.settings.cwd,
|
||||||
env: workerEnv,
|
env: workerEnv,
|
||||||
|
serialization: cluster.settings.serialization,
|
||||||
silent: cluster.settings.silent,
|
silent: cluster.settings.silent,
|
||||||
windowsHide: cluster.settings.windowsHide,
|
windowsHide: cluster.settings.windowsHide,
|
||||||
execArgv: execArgv,
|
execArgv: execArgv,
|
||||||
|
1
node.gyp
1
node.gyp
@ -91,6 +91,7 @@
|
|||||||
'lib/internal/buffer.js',
|
'lib/internal/buffer.js',
|
||||||
'lib/internal/cli_table.js',
|
'lib/internal/cli_table.js',
|
||||||
'lib/internal/child_process.js',
|
'lib/internal/child_process.js',
|
||||||
|
'lib/internal/child_process/serialization.js',
|
||||||
'lib/internal/cluster/child.js',
|
'lib/internal/cluster/child.js',
|
||||||
'lib/internal/cluster/master.js',
|
'lib/internal/cluster/master.js',
|
||||||
'lib/internal/cluster/round_robin_handle.js',
|
'lib/internal/cluster/round_robin_handle.js',
|
||||||
|
@ -180,12 +180,26 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Local<Object> req_wrap_obj = args[0].As<Object>();
|
Local<Object> req_wrap_obj = args[0].As<Object>();
|
||||||
|
|
||||||
uv_buf_t buf;
|
uv_buf_t buf;
|
||||||
buf.base = Buffer::Data(args[1]);
|
buf.base = Buffer::Data(args[1]);
|
||||||
buf.len = Buffer::Length(args[1]);
|
buf.len = Buffer::Length(args[1]);
|
||||||
|
|
||||||
StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
|
uv_stream_t* send_handle = nullptr;
|
||||||
|
|
||||||
|
if (args[2]->IsObject() && IsIPCPipe()) {
|
||||||
|
Local<Object> send_handle_obj = args[2].As<Object>();
|
||||||
|
|
||||||
|
HandleWrap* wrap;
|
||||||
|
ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
|
||||||
|
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
|
||||||
|
// Reference LibuvStreamWrap instance to prevent it from being garbage
|
||||||
|
// collected before `AfterWrite` is called.
|
||||||
|
req_wrap_obj->Set(env->context(),
|
||||||
|
env->handle_string(),
|
||||||
|
send_handle_obj).Check();
|
||||||
|
}
|
||||||
|
|
||||||
|
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
|
||||||
SetWriteResult(res);
|
SetWriteResult(res);
|
||||||
|
|
||||||
return res.err;
|
return res.err;
|
||||||
|
46
test/parallel/test-child-process-advanced-serialization.js
Normal file
46
test/parallel/test-child-process-advanced-serialization.js
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
'use strict';
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const child_process = require('child_process');
|
||||||
|
const { once } = require('events');
|
||||||
|
|
||||||
|
if (process.argv[2] !== 'child') {
|
||||||
|
for (const value of [null, 42, Infinity, 'foo']) {
|
||||||
|
common.expectsError(() => {
|
||||||
|
child_process.spawn(process.execPath, [], { serialization: value });
|
||||||
|
}, {
|
||||||
|
code: 'ERR_INVALID_OPT_VALUE',
|
||||||
|
message: `The value "${value}" is invalid ` +
|
||||||
|
'for option "options.serialization"'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
const cp = child_process.spawn(process.execPath, [__filename, 'child'],
|
||||||
|
{
|
||||||
|
stdio: ['ipc', 'inherit', 'inherit'],
|
||||||
|
serialization: 'advanced'
|
||||||
|
});
|
||||||
|
|
||||||
|
const circular = {};
|
||||||
|
circular.circular = circular;
|
||||||
|
for await (const message of [
|
||||||
|
{ uint8: new Uint8Array(4) },
|
||||||
|
{ float64: new Float64Array([ Math.PI ]) },
|
||||||
|
{ buffer: Buffer.from('Hello!') },
|
||||||
|
{ map: new Map([{ a: 1 }, { b: 2 }]) },
|
||||||
|
{ bigInt: 1337n },
|
||||||
|
circular,
|
||||||
|
new Error('Something went wrong'),
|
||||||
|
new RangeError('Something range-y went wrong'),
|
||||||
|
]) {
|
||||||
|
cp.send(message);
|
||||||
|
const [ received ] = await once(cp, 'message');
|
||||||
|
assert.deepStrictEqual(received, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
cp.disconnect();
|
||||||
|
})().then(common.mustCall());
|
||||||
|
} else {
|
||||||
|
process.on('message', (msg) => process.send(msg));
|
||||||
|
}
|
22
test/parallel/test-cluster-advanced-serialization.js
Normal file
22
test/parallel/test-cluster-advanced-serialization.js
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
'use strict';
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const cluster = require('cluster');
|
||||||
|
|
||||||
|
if (cluster.isMaster) {
|
||||||
|
cluster.settings.serialization = 'advanced';
|
||||||
|
const worker = cluster.fork();
|
||||||
|
const circular = {};
|
||||||
|
circular.circular = circular;
|
||||||
|
|
||||||
|
worker.on('online', common.mustCall(() => {
|
||||||
|
worker.send(circular);
|
||||||
|
|
||||||
|
worker.on('message', common.mustCall((msg) => {
|
||||||
|
assert.deepStrictEqual(msg, circular);
|
||||||
|
worker.kill();
|
||||||
|
}));
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
process.on('message', (msg) => process.send(msg));
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user