child_process,cluster: allow using V8 serialization API

Add an `serialization` option that allows child process IPC to
use the (typically more powerful) V8 serialization API.

Fixes: https://github.com/nodejs/node/issues/10965

PR-URL: https://github.com/nodejs/node/pull/30162
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: David Carlier <devnexen@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
This commit is contained in:
Anna Henningsen 2019-10-29 15:15:36 +01:00
parent f17e414dc4
commit 973f324463
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
13 changed files with 302 additions and 38 deletions

View File

@ -7,16 +7,25 @@ if (cluster.isMaster) {
workers: [1],
payload: ['string', 'object'],
sendsPerBroadcast: [1, 10],
serialization: ['json', 'advanced'],
n: [1e5]
});
function main({ n, workers, sendsPerBroadcast, payload }) {
function main({
n,
workers,
sendsPerBroadcast,
payload,
serialization
}) {
const expectedPerBroadcast = sendsPerBroadcast * workers;
var readies = 0;
var broadcasts = 0;
var msgCount = 0;
var data;
cluster.settings.serialization = serialization;
switch (payload) {
case 'string':
data = 'hello world!';

View File

@ -321,6 +321,9 @@ arbitrary command execution.**
<!-- YAML
added: v0.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30162
description: The `serialization` option is supported now.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/10866
description: The `stdio` option can now be a string.
@ -340,6 +343,9 @@ changes:
* `execPath` {string} Executable used to create the child process.
* `execArgv` {string[]} List of string arguments passed to the executable.
**Default:** `process.execArgv`.
* `serialization` {string} Specify the kind of serialization used for sending
messages between processes. Possible values are `'json'` and `'advanced'`.
See [Advanced Serialization][] for more details. **Default:** `'json'`.
* `silent` {boolean} If `true`, stdin, stdout, and stderr of the child will be
piped to the parent, otherwise they will be inherited from the parent, see
the `'pipe'` and `'inherit'` options for [`child_process.spawn()`][]'s
@ -386,6 +392,9 @@ The `shell` option available in [`child_process.spawn()`][] is not supported by
<!-- YAML
added: v0.1.90
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30162
description: The `serialization` option is supported now.
- version: v8.8.0
pr-url: https://github.com/nodejs/node/pull/15380
description: The `windowsHide` option is supported now.
@ -411,6 +420,9 @@ changes:
[`options.detached`][]).
* `uid` {number} Sets the user identity of the process (see setuid(2)).
* `gid` {number} Sets the group identity of the process (see setgid(2)).
* `serialization` {string} Specify the kind of serialization used for sending
messages between processes. Possible values are `'json'` and `'advanced'`.
See [Advanced Serialization][] for more details. **Default:** `'json'`.
* `shell` {boolean|string} If `true`, runs `command` inside of a shell. Uses
`'/bin/sh'` on Unix, and `process.env.ComSpec` on Windows. A different
shell can be specified as a string. See [Shell Requirements][] and
@ -998,6 +1010,11 @@ The `'message'` event is triggered when a child process uses
The message goes through serialization and parsing. The resulting
message might not be the same as what is originally sent.
If the `serialization` option was set to `'advanced'` used when spawning the
child process, the `message` argument can contain data that JSON is not able
to represent.
See [Advanced Serialization][] for more details.
### subprocess.channel
<!-- YAML
added: v7.1.0
@ -1472,6 +1489,26 @@ the same requirement. Thus, in `child_process` functions where a shell can be
spawned, `'cmd.exe'` is used as a fallback if `process.env.ComSpec` is
unavailable.
## Advanced Serialization
<!-- YAML
added: REPLACEME
-->
Child processes support a serialization mechanism for IPC that is based on the
[serialization API of the `v8` module][v8.serdes], based on the
[HTML structured clone algorithm][]. This is generally more powerful and
supports more built-in JavaScript object types, such as `BigInt`, `Map`
and `Set`, `ArrayBuffer` and `TypedArray`, `Buffer`, `Error`, `RegExp` etc.
However, this format is not a full superset of JSON, and e.g. properties set on
objects of such built-in types will not be passed on through the serialization
step. Additionally, performance may not be equivalent to that of JSON, depending
on the structure of the passed data.
Therefore, this feature requires opting in by setting the
`serialization` option to `'advanced'` when calling [`child_process.spawn()`][]
or [`child_process.fork()`][].
[Advanced Serialization]: #child_process_advanced_serialization
[`'disconnect'`]: process.html#process_event_disconnect
[`'error'`]: #child_process_event_error
[`'exit'`]: #child_process_event_exit
@ -1505,5 +1542,7 @@ unavailable.
[`subprocess.stdout`]: #child_process_subprocess_stdout
[`util.promisify()`]: util.html#util_util_promisify_original
[Default Windows Shell]: #child_process_default_windows_shell
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
[Shell Requirements]: #child_process_shell_requirements
[synchronous counterparts]: #child_process_synchronous_process_creation
[v8.serdes]: v8.html#v8_serialization_api

View File

@ -724,6 +724,9 @@ values are `'rr'` and `'none'`.
<!-- YAML
added: v0.7.1
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30162
description: The `serialization` option is supported now.
- version: v9.5.0
pr-url: https://github.com/nodejs/node/pull/18399
description: The `cwd` option is supported now.
@ -746,6 +749,10 @@ changes:
**Default:** `process.argv.slice(2)`.
* `cwd` {string} Current working directory of the worker process. **Default:**
`undefined` (inherits from parent process).
* `serialization` {string} Specify the kind of serialization used for sending
messages between processes. Possible values are `'json'` and `'advanced'`.
See [Advanced Serialization for `child_process`][] for more details.
**Default:** `false`.
* `silent` {boolean} Whether or not to send output to parent's stdio.
**Default:** `false`.
* `stdio` {Array} Configures the stdio of forked processes. Because the
@ -874,4 +881,5 @@ socket.on('data', (id) => {
[`process` event: `'message'`]: process.html#process_event_message
[`server.close()`]: net.html#net_event_close
[`worker.exitedAfterDisconnect`]: #cluster_worker_exitedafterdisconnect
[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
[Child Process module]: child_process.html#child_process_child_process_fork_modulepath_args_options

View File

@ -119,6 +119,11 @@ the child process.
The message goes through serialization and parsing. The resulting message might
not be the same as what is originally sent.
If the `serialization` option was set to `advanced` used when spawning the
process, the `message` argument can contain data that JSON is not able
to represent.
See [Advanced Serialization for `child_process`][] for more details.
### Event: 'multipleResolves'
<!-- YAML
added: v10.12.0
@ -2456,6 +2461,7 @@ cases:
[`require.resolve()`]: modules.html#modules_require_resolve_request_options
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
[`v8.setFlagsFromString()`]: v8.html#v8_v8_setflagsfromstring_flags
[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
[Android building]: https://github.com/nodejs/node/blob/master/BUILDING.md#androidandroid-based-devices-eg-firefox-os
[Child Process]: child_process.html
[Cluster]: cluster.html

View File

@ -108,12 +108,12 @@ function fork(modulePath /* , args, options */) {
return spawn(options.execPath, args, options);
}
function _forkChild(fd) {
function _forkChild(fd, serializationMode) {
// set process.send()
const p = new Pipe(PipeConstants.IPC);
p.open(fd);
p.unref();
const control = setupChannel(process, p);
const control = setupChannel(process, p, serializationMode);
process.on('newListener', function onNewListener(name) {
if (name === 'message' || name === 'disconnect') control.ref();
});

View File

@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() {
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_CHANNEL_FD;
require('child_process')._forkChild(fd);
const serializationMode =
process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json';
delete process.env.NODE_CHANNEL_SERIALIZATION_MODE;
require('child_process')._forkChild(fd, serializationMode);
assert(process.send);
}
}

View File

@ -1,6 +1,6 @@
'use strict';
const { JSON, Object } = primordials;
const { Object } = primordials;
const {
errnoException,
@ -55,8 +55,6 @@ const {
const { SocketListSend, SocketListReceive } = SocketList;
// Lazy loaded for startup performance.
let StringDecoder;
// Lazy loaded for startup performance and to allow monkey patching of
// internalBinding('http_parser').HTTPParser.
let freeParser;
@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) {
const ipcFd = stdio.ipcFd;
stdio = options.stdio = stdio.stdio;
if (options.serialization !== undefined &&
options.serialization !== 'json' &&
options.serialization !== 'advanced') {
throw new ERR_INVALID_OPT_VALUE('options.serialization',
options.serialization);
}
const serialization = options.serialization || 'json';
if (ipc !== undefined) {
// Let child process know about opened IPC channel
if (options.envPairs === undefined)
@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) {
options.envPairs);
}
options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
}
validateString(options.file, 'options.file');
@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) {
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
// Add .send() method and start listening for IPC data
if (ipc !== undefined) setupChannel(this, ipc);
if (ipc !== undefined) setupChannel(this, ipc, serialization);
return err;
};
@ -516,7 +524,8 @@ class Control extends EventEmitter {
const channelDeprecationMsg = '_channel is deprecated. ' +
'Use ChildProcess.channel instead.';
function setupChannel(target, channel) {
let serialization;
function setupChannel(target, channel, serializationMode) {
target.channel = channel;
Object.defineProperty(target, '_channel', {
@ -535,12 +544,16 @@ function setupChannel(target, channel) {
const control = new Control(channel);
if (StringDecoder === undefined)
StringDecoder = require('string_decoder').StringDecoder;
const decoder = new StringDecoder('utf8');
var jsonBuffer = '';
var pendingHandle = null;
channel.buffering = false;
if (serialization === undefined)
serialization = require('internal/child_process/serialization');
const {
initMessageChannel,
parseChannelMessages,
writeChannelMessage
} = serialization[serializationMode];
let pendingHandle = null;
initMessageChannel(channel);
channel.pendingHandle = null;
channel.onread = function(arrayBuffer) {
const recvHandle = channel.pendingHandle;
@ -552,21 +565,7 @@ function setupChannel(target, channel) {
if (recvHandle)
pendingHandle = recvHandle;
// Linebreak is used as a message end sign
var chunks = decoder.write(pool).split('\n');
var numCompleteChunks = chunks.length - 1;
// Last line does not have trailing linebreak
var incompleteChunk = chunks[numCompleteChunks];
if (numCompleteChunks === 0) {
jsonBuffer += incompleteChunk;
this.buffering = jsonBuffer.length !== 0;
return;
}
chunks[0] = jsonBuffer + chunks[0];
for (var i = 0; i < numCompleteChunks; i++) {
var message = JSON.parse(chunks[i]);
for (const message of parseChannelMessages(channel, pool)) {
// There will be at most one NODE_HANDLE message in every chunk we
// read because SCM_RIGHTS messages don't get coalesced. Make sure
// that we deliver the handle with the right message however.
@ -581,9 +580,6 @@ function setupChannel(target, channel) {
handleMessage(message, undefined, false);
}
}
jsonBuffer = incompleteChunk;
this.buffering = jsonBuffer.length !== 0;
} else {
this.buffering = false;
target.disconnect();
@ -782,8 +778,7 @@ function setupChannel(target, channel) {
const req = new WriteWrap();
const string = JSON.stringify(message) + '\n';
const err = channel.writeUtf8String(req, string, handle);
const err = writeChannelMessage(channel, req, message, handle);
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
if (err === 0) {

View File

@ -0,0 +1,119 @@
'use strict';
const { JSON } = primordials;
const { Buffer } = require('buffer');
const { StringDecoder } = require('string_decoder');
const v8 = require('v8');
const { isArrayBufferView } = require('internal/util/types');
const assert = require('internal/assert');
const kMessageBuffer = Symbol('kMessageBuffer');
const kJSONBuffer = Symbol('kJSONBuffer');
const kStringDecoder = Symbol('kStringDecoder');
// Extend V8's serializer APIs to give more JSON-like behaviour in
// some cases; in particular, for native objects this serializes them the same
// way that JSON does rather than throwing an exception.
const kArrayBufferViewTag = 0;
const kNotArrayBufferViewTag = 1;
class ChildProcessSerializer extends v8.DefaultSerializer {
_writeHostObject(object) {
if (isArrayBufferView(object)) {
this.writeUint32(kArrayBufferViewTag);
return super._writeHostObject(object);
} else {
this.writeUint32(kNotArrayBufferViewTag);
this.writeValue({ ...object });
}
}
}
class ChildProcessDeserializer extends v8.DefaultDeserializer {
_readHostObject() {
const tag = this.readUint32();
if (tag === kArrayBufferViewTag)
return super._readHostObject();
assert(tag === kNotArrayBufferViewTag);
return this.readValue();
}
}
// Messages are parsed in either of the following formats:
// - Newline-delimited JSON, or
// - V8-serialized buffers, prefixed with their length as a big endian uint32
// (aka 'advanced')
const advanced = {
initMessageChannel(channel) {
channel[kMessageBuffer] = Buffer.alloc(0);
channel.buffering = false;
},
*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;
let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
while (messageBuffer.length > 4) {
const size = messageBuffer.readUInt32BE();
if (messageBuffer.length < 4 + size) {
break;
}
const deserializer = new ChildProcessDeserializer(
messageBuffer.subarray(4, 4 + size));
messageBuffer = messageBuffer.subarray(4 + size);
deserializer.readHeader();
yield deserializer.readValue();
}
channel[kMessageBuffer] = messageBuffer;
channel.buffering = messageBuffer.length > 0;
},
writeChannelMessage(channel, req, message, handle) {
const ser = new ChildProcessSerializer();
ser.writeHeader();
ser.writeValue(message);
const serializedMessage = ser.releaseBuffer();
const sizeBuffer = Buffer.allocUnsafe(4);
sizeBuffer.writeUInt32BE(serializedMessage.length);
return channel.writeBuffer(req, Buffer.concat([
sizeBuffer,
serializedMessage
]), handle);
},
};
const json = {
initMessageChannel(channel) {
channel[kJSONBuffer] = '';
channel[kStringDecoder] = undefined;
},
*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;
if (channel[kStringDecoder] === undefined)
channel[kStringDecoder] = new StringDecoder('utf8');
const chunks = channel[kStringDecoder].write(readData).split('\n');
const numCompleteChunks = chunks.length - 1;
// Last line does not have trailing linebreak
const incompleteChunk = chunks[numCompleteChunks];
if (numCompleteChunks === 0) {
channel[kJSONBuffer] += incompleteChunk;
} else {
chunks[0] = channel[kJSONBuffer] + chunks[0];
for (let i = 0; i < numCompleteChunks; i++)
yield JSON.parse(chunks[i]);
channel[kJSONBuffer] = incompleteChunk;
}
channel.buffering = channel[kJSONBuffer].length !== 0;
},
writeChannelMessage(channel, req, message, handle) {
const string = JSON.stringify(message) + '\n';
return channel.writeUtf8String(req, string, handle);
},
};
module.exports = { advanced, json };

View File

@ -130,6 +130,7 @@ function createWorkerProcess(id, env) {
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,

View File

@ -91,6 +91,7 @@
'lib/internal/buffer.js',
'lib/internal/cli_table.js',
'lib/internal/child_process.js',
'lib/internal/child_process/serialization.js',
'lib/internal/cluster/child.js',
'lib/internal/cluster/master.js',
'lib/internal/cluster/round_robin_handle.js',

View File

@ -180,12 +180,26 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
}
Local<Object> req_wrap_obj = args[0].As<Object>();
uv_buf_t buf;
buf.base = Buffer::Data(args[1]);
buf.len = Buffer::Length(args[1]);
StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
uv_stream_t* send_handle = nullptr;
if (args[2]->IsObject() && IsIPCPipe()) {
Local<Object> send_handle_obj = args[2].As<Object>();
HandleWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).Check();
}
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
SetWriteResult(res);
return res.err;

View File

@ -0,0 +1,46 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const child_process = require('child_process');
const { once } = require('events');
if (process.argv[2] !== 'child') {
for (const value of [null, 42, Infinity, 'foo']) {
common.expectsError(() => {
child_process.spawn(process.execPath, [], { serialization: value });
}, {
code: 'ERR_INVALID_OPT_VALUE',
message: `The value "${value}" is invalid ` +
'for option "options.serialization"'
});
}
(async () => {
const cp = child_process.spawn(process.execPath, [__filename, 'child'],
{
stdio: ['ipc', 'inherit', 'inherit'],
serialization: 'advanced'
});
const circular = {};
circular.circular = circular;
for await (const message of [
{ uint8: new Uint8Array(4) },
{ float64: new Float64Array([ Math.PI ]) },
{ buffer: Buffer.from('Hello!') },
{ map: new Map([{ a: 1 }, { b: 2 }]) },
{ bigInt: 1337n },
circular,
new Error('Something went wrong'),
new RangeError('Something range-y went wrong'),
]) {
cp.send(message);
const [ received ] = await once(cp, 'message');
assert.deepStrictEqual(received, message);
}
cp.disconnect();
})().then(common.mustCall());
} else {
process.on('message', (msg) => process.send(msg));
}

View File

@ -0,0 +1,22 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const cluster = require('cluster');
if (cluster.isMaster) {
cluster.settings.serialization = 'advanced';
const worker = cluster.fork();
const circular = {};
circular.circular = circular;
worker.on('online', common.mustCall(() => {
worker.send(circular);
worker.on('message', common.mustCall((msg) => {
assert.deepStrictEqual(msg, circular);
worker.kill();
}));
}));
} else {
process.on('message', (msg) => process.send(msg));
}