node/lib/internal/streams/end-of-stream.js
Robert Nagy c15a27cab3 stream: don't wait for close on legacy streams
Try to detect non standard streams and don't wait for
'close' on these. In particular if we detected
that destroyed is true before we expect it to be then
fallback to compat behavior.

Fixes: https://github.com/nodejs/node/issues/33050

PR-URL: https://github.com/nodejs/node/pull/33058
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Reviewed-By: Zeyu Yang <himself65@outlook.com>
2020-04-27 17:08:26 +02:00

183 lines
5.5 KiB
JavaScript

// Ported from https://github.com/mafintosh/end-of-stream with
// permission from the author, Mathias Buus (@mafintosh).
'use strict';
const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;
const { once } = require('internal/util');
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}
function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}
function isWritableFinished(stream) {
if (stream.writableFinished) return true;
const wState = stream._writableState;
if (!wState || wState.errored) return false;
return wState.finished || (wState.ended && wState.length === 0);
}
function nop() {}
function isReadableEnded(stream) {
if (stream.readableEnded) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
return rState.endEmitted || (rState.ended && rState.length === 0);
}
function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
opts = {};
} else if (opts == null) {
opts = {};
} else if (typeof opts !== 'object') {
throw new ERR_INVALID_ARG_TYPE('opts', 'object', opts);
}
if (typeof callback !== 'function') {
throw new ERR_INVALID_ARG_TYPE('callback', 'function', callback);
}
callback = once(callback);
const readable = opts.readable ||
(opts.readable !== false && isReadable(stream));
const writable = opts.writable ||
(opts.writable !== false && isWritable(stream));
const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;
const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};
// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
let willEmitClose = (
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false &&
isReadable(stream) === readable &&
isWritable(stream) === writable
);
let writableFinished = stream.writableFinished ||
(wState && wState.finished);
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
if (stream.destroyed) willEmitClose = false;
if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
};
let readableEnded = stream.readableEnded ||
(rState && rState.endEmitted);
const onend = () => {
readableEnded = true;
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
if (stream.destroyed) willEmitClose = false;
if (willEmitClose && (!stream.writable || writable)) return;
if (!writable || writableFinished) callback.call(stream);
};
const onerror = (err) => {
callback.call(stream, err);
};
const onclose = () => {
if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
callback.call(stream);
};
const onrequest = () => {
stream.req.on('finish', onfinish);
};
if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}
// Not all streams will emit 'close' after 'aborted'.
if (typeof stream.aborted === 'boolean') {
stream.on('aborted', onclose);
}
stream.on('end', onend);
stream.on('finish', onfinish);
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);
const closed = (wState && wState.closed) || (rState && rState.closed) ||
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
(wState && wState.finished) || (rState && rState.endEmitted) ||
(rState && stream.req && stream.aborted);
if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
process.nextTick(() => {
callback();
});
}
return function() {
callback = nop;
stream.removeListener('aborted', onclose);
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
};
}
module.exports = eos;