mirror of
https://github.com/nodejs/node.git
synced 2025-05-04 06:27:35 +00:00

A JS stream socket wraps a stream, exposing it as a socket for something on top which needs a socket specifically (e.g. an HTTP server). If the internal stream is closed in the same tick as the layer on top attempts to close this stream, the race between doShutdown and doClose results in an uncatchable exception. A similar race can happen with doClose and doWrite. It seems legitimate these can happen in parallel, so this resolves that by explicitly detecting and handling that situation: if a close is in progress, both doShutdown & doWrite allow doClose to run finishShutdown/Write for them, cancelling the operation, without trying to use this._handle (which will be null) in the meantime. PR-URL: https://github.com/nodejs/node/pull/49400 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
263 lines
8.0 KiB
JavaScript
263 lines
8.0 KiB
JavaScript
'use strict';
|
|
|
|
const {
|
|
Symbol,
|
|
} = primordials;
|
|
|
|
const { setImmediate } = require('timers');
|
|
const assert = require('internal/assert');
|
|
const { Socket } = require('net');
|
|
const { JSStream } = internalBinding('js_stream');
|
|
const uv = internalBinding('uv');
|
|
let debug = require('internal/util/debuglog').debuglog(
|
|
'stream_socket',
|
|
(fn) => {
|
|
debug = fn;
|
|
},
|
|
);
|
|
const { owner_symbol } = require('internal/async_hooks').symbols;
|
|
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
|
|
|
|
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
|
|
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
|
|
const kPendingShutdownRequest = Symbol('kPendingShutdownRequest');
|
|
const kPendingClose = Symbol('kPendingClose');
|
|
|
|
function isClosing() { return this[owner_symbol].isClosing(); }
|
|
|
|
function onreadstart() { return this[owner_symbol].readStart(); }
|
|
|
|
function onreadstop() { return this[owner_symbol].readStop(); }
|
|
|
|
function onshutdown(req) { return this[owner_symbol].doShutdown(req); }
|
|
|
|
function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); }
|
|
|
|
/* This class serves as a wrapper for when the C++ side of Node wants access
|
|
* to a standard JS stream. For example, TLS or HTTP do not operate on network
|
|
* resources conceptually, although that is the common case and what we are
|
|
* optimizing for; in theory, they are completely composable and can work with
|
|
* any stream resource they see.
|
|
*
|
|
* For the common case, i.e. a TLS socket wrapping around a net.Socket, we
|
|
* can skip going through the JS layer and let TLS access the raw C++ handle
|
|
* of a net.Socket. The flipside of this is that, to maintain composability,
|
|
* we need a way to create "fake" net.Socket instances that call back into a
|
|
* "real" JavaScript stream. JSStreamSocket is exactly this.
|
|
*/
|
|
class JSStreamSocket extends Socket {
|
|
constructor(stream) {
|
|
const handle = new JSStream();
|
|
handle.close = (cb) => {
|
|
debug('close');
|
|
this.doClose(cb);
|
|
};
|
|
// Inside of the following functions, `this` refers to the handle
|
|
// and `this[owner_symbol]` refers to this JSStreamSocket instance.
|
|
handle.isClosing = isClosing;
|
|
handle.onreadstart = onreadstart;
|
|
handle.onreadstop = onreadstop;
|
|
handle.onshutdown = onshutdown;
|
|
handle.onwrite = onwrite;
|
|
|
|
stream.pause();
|
|
stream.on('error', (err) => this.emit('error', err));
|
|
const ondata = (chunk) => {
|
|
if (typeof chunk === 'string' ||
|
|
stream.readableObjectMode === true) {
|
|
// Make sure that no further `data` events will happen.
|
|
stream.pause();
|
|
stream.removeListener('data', ondata);
|
|
|
|
this.emit('error', new ERR_STREAM_WRAP());
|
|
return;
|
|
}
|
|
|
|
debug('data', chunk.length);
|
|
if (this._handle)
|
|
this._handle.readBuffer(chunk);
|
|
};
|
|
stream.on('data', ondata);
|
|
stream.once('end', () => {
|
|
debug('end');
|
|
if (this._handle)
|
|
this._handle.emitEOF();
|
|
});
|
|
// Some `Stream` don't pass `hasError` parameters when closed.
|
|
stream.once('close', () => {
|
|
// Errors emitted from `stream` have also been emitted to this instance
|
|
// so that we don't pass errors to `destroy()` again.
|
|
this.destroy();
|
|
});
|
|
|
|
super({ handle, manualStart: true });
|
|
this.stream = stream;
|
|
this[kCurrentWriteRequest] = null;
|
|
this[kCurrentShutdownRequest] = null;
|
|
this[kPendingShutdownRequest] = null;
|
|
this[kPendingClose] = false;
|
|
this.readable = stream.readable;
|
|
this.writable = stream.writable;
|
|
|
|
// Start reading.
|
|
this.read(0);
|
|
}
|
|
|
|
// Allow legacy requires in the test suite to keep working:
|
|
// const { StreamWrap } = require('internal/js_stream_socket')
|
|
static get StreamWrap() {
|
|
return JSStreamSocket;
|
|
}
|
|
|
|
isClosing() {
|
|
return !this.readable || !this.writable;
|
|
}
|
|
|
|
readStart() {
|
|
this.stream.resume();
|
|
return 0;
|
|
}
|
|
|
|
readStop() {
|
|
this.stream.pause();
|
|
return 0;
|
|
}
|
|
|
|
doShutdown(req) {
|
|
// TODO(addaleax): It might be nice if we could get into a state where
|
|
// DoShutdown() is not called on streams while a write is still pending.
|
|
//
|
|
// Currently, the only part of the code base where that happens is the
|
|
// TLS implementation, which calls both DoWrite() and DoShutdown() on the
|
|
// underlying network stream inside of its own DoShutdown() method.
|
|
// Working around that on the native side is not quite trivial (yet?),
|
|
// so for now that is supported here.
|
|
|
|
if (this[kCurrentWriteRequest] !== null) {
|
|
this[kPendingShutdownRequest] = req;
|
|
return 0;
|
|
}
|
|
|
|
assert(this[kCurrentWriteRequest] === null);
|
|
assert(this[kCurrentShutdownRequest] === null);
|
|
this[kCurrentShutdownRequest] = req;
|
|
|
|
if (this[kPendingClose]) {
|
|
// If doClose is pending, the stream & this._handle are gone. We can't do
|
|
// anything. doClose will call finishShutdown with ECANCELED for us shortly.
|
|
return 0;
|
|
}
|
|
|
|
const handle = this._handle;
|
|
|
|
process.nextTick(() => {
|
|
// Ensure that write is dispatched asynchronously.
|
|
this.stream.end(() => {
|
|
this.finishShutdown(handle, 0);
|
|
});
|
|
});
|
|
return 0;
|
|
}
|
|
|
|
// handle === this._handle except when called from doClose().
|
|
finishShutdown(handle, errCode) {
|
|
// The shutdown request might already have been cancelled.
|
|
if (this[kCurrentShutdownRequest] === null)
|
|
return;
|
|
const req = this[kCurrentShutdownRequest];
|
|
this[kCurrentShutdownRequest] = null;
|
|
handle.finishShutdown(req, errCode);
|
|
}
|
|
|
|
doWrite(req, bufs) {
|
|
assert(this[kCurrentWriteRequest] === null);
|
|
assert(this[kCurrentShutdownRequest] === null);
|
|
|
|
if (this[kPendingClose]) {
|
|
// If doClose is pending, the stream & this._handle are gone. We can't do
|
|
// anything. doClose will call finishWrite with ECANCELED for us shortly.
|
|
this[kCurrentWriteRequest] = req; // Store req, for doClose to cancel
|
|
return 0;
|
|
}
|
|
|
|
const handle = this._handle;
|
|
const self = this;
|
|
|
|
let pending = bufs.length;
|
|
|
|
this.stream.cork();
|
|
// Use `var` over `let` for performance optimization.
|
|
// eslint-disable-next-line no-var
|
|
for (var i = 0; i < bufs.length; ++i)
|
|
this.stream.write(bufs[i], done);
|
|
this.stream.uncork();
|
|
|
|
// Only set the request here, because the `write()` calls could throw.
|
|
this[kCurrentWriteRequest] = req;
|
|
|
|
function done(err) {
|
|
if (!err && --pending !== 0)
|
|
return;
|
|
|
|
// Ensure that this is called once in case of error
|
|
pending = 0;
|
|
|
|
let errCode = 0;
|
|
if (err) {
|
|
errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
|
|
}
|
|
|
|
// Ensure that write was dispatched
|
|
setImmediate(() => {
|
|
self.finishWrite(handle, errCode);
|
|
});
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
// handle === this._handle except when called from doClose().
|
|
finishWrite(handle, errCode) {
|
|
// The write request might already have been cancelled.
|
|
if (this[kCurrentWriteRequest] === null)
|
|
return;
|
|
const req = this[kCurrentWriteRequest];
|
|
this[kCurrentWriteRequest] = null;
|
|
|
|
handle.finishWrite(req, errCode);
|
|
if (this[kPendingShutdownRequest]) {
|
|
const req = this[kPendingShutdownRequest];
|
|
this[kPendingShutdownRequest] = null;
|
|
this.doShutdown(req);
|
|
}
|
|
}
|
|
|
|
doClose(cb) {
|
|
this[kPendingClose] = true;
|
|
|
|
const handle = this._handle;
|
|
|
|
// When sockets of the "net" module destroyed, they will call
|
|
// `this._handle.close()` which will also emit EOF if not emitted before.
|
|
// This feature makes sockets on the other side emit "end" and "close"
|
|
// even though we haven't called `end()`. As `stream` are likely to be
|
|
// instances of `net.Socket`, calling `stream.destroy()` manually will
|
|
// avoid issues that don't properly close wrapped connections.
|
|
this.stream.destroy();
|
|
|
|
setImmediate(() => {
|
|
// Should be already set by net.js
|
|
assert(this._handle === null);
|
|
|
|
this.finishWrite(handle, uv.UV_ECANCELED);
|
|
this.finishShutdown(handle, uv.UV_ECANCELED);
|
|
|
|
this[kPendingClose] = false;
|
|
|
|
cb();
|
|
});
|
|
}
|
|
}
|
|
|
|
module.exports = JSStreamSocket;
|