node/lib/internal/stream_base_commons.js
Jameson Nash f3fbeafeec tls: improve handling of shutdown
RFC 5246 section-7.2.1 requires that the implementation must immediately
stop reading from the stream, as it is no longer TLS-encrypted. The
underlying stream is permitted to still pump events (and errors) to
other users, but those are now unencrypted, so we should not process
them here. But therefore, we do not want to stop the underlying stream,
as there could be another user of it, but we do need to remove ourselves
as a listener.

Per TLS v1.2, we should have also destroy the TLS state entirely here
(including the writing side), but this was revised in TLS v1.3 to permit
the stream to continue to flush output.

There appears to be some inconsistencies in the way nodejs handles
ownership of the underlying stream, with `TLS.close()` on the write side
also calling shutdown on the underlying stream (thus assuming other
users of the underlying stream are not permitted), while receiving EOF
on the read side leaves the underlying channel open. These
inconsistencies are left for a later person to resolve, if the extra
functionality is needed (as described in #35904). The current goal here
is to the fix the occasional CI exceptions depending on the timing of
these kernel messages through the TCP stack.

PR-URL: https://github.com/nodejs/node/pull/36111
Fixes: https://github.com/nodejs/node/pull/35946
Refs: https://github.com/libuv/libuv/pull/3036
Refs: https://github.com/nodejs/node/issues/35904
Co-authored-by: Momtchil Momtchev <momtchil@momtchev.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
2021-12-08 07:29:55 +01:00

281 lines
6.8 KiB
JavaScript

'use strict';
const {
Array,
Symbol,
} = primordials;
const { Buffer } = require('buffer');
const { FastBuffer } = require('internal/buffer');
const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
const {
errnoException
} = require('internal/errors');
const { owner_symbol } = require('internal/async_hooks').symbols;
const {
kTimeout,
setUnrefTimeout,
getTimerDuration
} = require('internal/timers');
const { isUint8Array } = require('internal/util/types');
const { clearTimeout } = require('timers');
const { validateCallback } = require('internal/validators');
const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
const kHandle = Symbol('kHandle');
const kSession = Symbol('kSession');
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
const kBuffer = Symbol('kBuffer');
const kBufferGen = Symbol('kBufferGen');
const kBufferCb = Symbol('kBufferCb');
function handleWriteReq(req, data, encoding) {
const { handle } = req;
switch (encoding) {
case 'buffer':
{
const ret = handle.writeBuffer(req, data);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = data;
return ret;
}
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
{
const buffer = Buffer.from(data, encoding);
const ret = handle.writeBuffer(req, buffer);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = buffer;
return ret;
}
}
}
function onWriteComplete(status) {
debug('onWriteComplete', status, this.error);
const stream = this.handle[owner_symbol];
if (stream.destroyed) {
if (typeof this.callback === 'function')
this.callback(null);
return;
}
// TODO (ronag): This should be moved before if(stream.destroyed)
// in order to avoid swallowing error.
if (status < 0) {
const ex = errnoException(status, 'write', this.error);
if (typeof this.callback === 'function')
this.callback(ex);
else
stream.destroy(ex);
return;
}
stream[kUpdateTimer]();
stream[kAfterAsyncWrite](this);
if (typeof this.callback === 'function')
this.callback(null);
}
function createWriteWrap(handle, callback) {
const req = new WriteWrap();
req.handle = handle;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
req.callback = callback;
return req;
}
function writevGeneric(self, data, cb) {
const req = createWriteWrap(self[kHandle], cb);
const allBuffers = data.allBuffers;
let chunks;
if (allBuffers) {
chunks = data;
for (let i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (let i = 0; i < data.length; i++) {
const entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
const err = req.handle.writev(req, chunks, allBuffers);
// Retain chunks
if (err === 0) req._chunks = chunks;
afterWriteDispatched(req, err, cb);
return req;
}
function writeGeneric(self, data, encoding, cb) {
const req = createWriteWrap(self[kHandle], cb);
const err = handleWriteReq(req, data, encoding);
afterWriteDispatched(req, err, cb);
return req;
}
function afterWriteDispatched(req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];
if (err !== 0)
return cb(errnoException(err, 'write', req.error));
if (!req.async && typeof req.callback === 'function') {
req.callback();
}
}
function onStreamRead(arrayBuffer) {
const nread = streamBaseState[kReadBytesOrError];
const handle = this;
const stream = this[owner_symbol];
stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) {
let ret;
let result;
const userBuf = stream[kBuffer];
if (userBuf) {
result = (stream[kBufferCb](nread, userBuf) !== false);
const bufGen = stream[kBufferGen];
if (bufGen !== null) {
const nextBuf = bufGen();
if (isUint8Array(nextBuf))
stream[kBuffer] = ret = nextBuf;
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
result = stream.push(buf);
}
if (!result) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
if (err)
stream.destroy(errnoException(err, 'read'));
}
}
return ret;
}
if (nread === 0) {
return;
}
// After seeing EOF, most streams will be closed permanently,
// and will not deliver any more read events after this point.
// (equivalently, it should have called readStop on itself already).
// Some streams may be reset and explicitly started again with a call
// to readStart, such as TTY.
if (nread !== UV_EOF) {
// CallJSOnreadMethod expects the return value to be a buffer.
// Ref: https://github.com/nodejs/node/pull/34375
stream.destroy(errnoException(nread, 'read'));
return;
}
// Defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (stream[kMaybeDestroy])
stream[kMaybeDestroy]();
} else {
if (stream[kMaybeDestroy])
stream.on('end', stream[kMaybeDestroy]);
// Push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
stream.push(null);
stream.read(0);
}
}
function setStreamTimeout(msecs, callback) {
if (this.destroyed)
return this;
this.timeout = msecs;
// Type checking identical to timers.enroll()
msecs = getTimerDuration(msecs, 'msecs');
// Attempt to clear an existing timer in both cases -
// even if it will be rescheduled we don't want to leak an existing timer.
clearTimeout(this[kTimeout]);
if (msecs === 0) {
if (callback !== undefined) {
validateCallback(callback);
this.removeListener('timeout', callback);
}
} else {
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
if (this[kSession]) this[kSession][kUpdateTimer]();
if (callback !== undefined) {
validateCallback(callback);
this.once('timeout', callback);
}
}
return this;
}
module.exports = {
writevGeneric,
writeGeneric,
onStreamRead,
kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer,
kHandle,
kSession,
setStreamTimeout,
kBuffer,
kBufferCb,
kBufferGen
};