stream: updated streams error handling

This improves error handling for streams in a few ways.

1. It ensures that no user defined methods (_read, _write, ...) are run
after .destroy has been called.
2. It introduces an explicit error to tell the user if they are write to
write, etc to the stream after it has been destroyed.
3. It makes streams always emit close as the last thing after they have
been destroyed
4. Changes the default _destroy to not gracefully end streams.

It also updates net, http2, zlib and fs to the new error handling.

PR-URL: https://github.com/nodejs/node/pull/18438
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
Mathias Buus 2018-01-29 19:32:34 +01:00 committed by Matteo Collina
parent acac0f852a
commit 5e3f51648e
18 changed files with 107 additions and 55 deletions

View File

@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js
process. The error object will have an `err.info` object property with
additional details.
<a id="ERR_STREAM_DESTROYED"></a>
### ERR_STREAM_DESTROYED
A stream method was called that cannot complete because the stream was
destroyed using `stream.destroy()`.
<a id="ERR_TLS_CERT_ALTNAME_INVALID"></a>
### ERR_TLS_CERT_ALTNAME_INVALID
@ -1615,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object.
The current module's status does not allow for this operation. The specific
meaning of the error depends on the specific function.
<a id="ERR_ZLIB_BINDING_CLOSED"></a>
### ERR_ZLIB_BINDING_CLOSED
An attempt was made to use a `zlib` object after it has already been closed.
<a id="ERR_ZLIB_INITIALIZATION_FAILED"></a>
### ERR_ZLIB_INITIALIZATION_FAILED

View File

@ -543,8 +543,10 @@ added: v8.0.0
* Returns: {this}
Destroy the stream, and emit the passed error. After this call, the
writable stream has ended. Implementors should not override this method,
Destroy the stream, and emit the passed `error` and a `close` event.
After this call, the writable stream has ended and subsequent calls
to `write` / `end` will give an `ERR_STREAM_DESTROYED` error.
Implementors should not override this method,
but instead implement [`writable._destroy`][writable-_destroy].
### Readable Streams
@ -1167,8 +1169,9 @@ myReader.on('readable', () => {
added: v8.0.0
-->
Destroy the stream, and emit `'error'`. After this call, the
readable stream will release any internal resources.
Destroy the stream, and emit `'error'` and `close`. After this call, the
readable stream will release any internal resources and subsequent calls
to `push` will be ignored.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
@ -1382,6 +1385,12 @@ constructor and implement the `writable._write()` method. The
`writable._writev()` method *may* also be implemented.
#### Constructor: new stream.Writable([options])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18438
description: Add `emitClose` option to specify if `close` is emitted on destroy
-->
* `options` {Object}
* `highWaterMark` {number} Buffer level when
@ -1395,6 +1404,8 @@ constructor and implement the `writable._write()` method. The
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
Defaults to `false`
* `emitClose` {boolean} Whether or not the stream should emit `close`
after it has been destroyed. Defaults to `true`
* `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the

View File

@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', {
this._writableState.destroyed = value;
}
});
Duplex.prototype._destroy = function(err, cb) {
this.push(null);
this.end();
process.nextTick(cb, err);
};

View File

@ -106,6 +106,9 @@ function ReadableState(options, stream) {
this.readableListening = false;
this.resumeScheduled = false;
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
// has it been destroyed
this.destroyed = false;
@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
this.push(null);
cb(err);
};
@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
} else if (state.destroyed) {
return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {

View File

@ -132,7 +132,7 @@ function Transform(options) {
}
function prefinish() {
if (typeof this._flush === 'function') {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush((er, data) => {
done(this, er, data);
});
@ -194,7 +194,6 @@ Transform.prototype._read = function(n) {
Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
this.emit('close');
});
};

View File

@ -134,6 +134,9 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
// count buffered requests
this.bufferedRequestCount = 0;
@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
if (writev)
if (state.destroyed)
state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write'));
else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
@ -604,7 +609,7 @@ function callFinal(stream, state) {
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function') {
if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', {
Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
this.end();
cb(err);
};

View File

@ -1929,6 +1929,9 @@ function ReadStream(path, options) {
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
// for backwards compat do not emit close on destroy.
options.emitClose = false;
Readable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
@ -2084,6 +2087,9 @@ function WriteStream(path, options) {
options = copyObject(getOptions(options, {}));
// for backwards compat do not emit close on destroy.
options.emitClose = false;
Writable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy

View File

@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error);
E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error);
E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed');
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error);
@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
function sysError(code, syscall, path, dest,

View File

@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex {
constructor(session, options) {
options.allowHalfOpen = true;
options.decodeStrings = false;
options.emitClose = false;
super(options);
this[async_id_symbol] = -1;

View File

@ -30,6 +30,7 @@ function destroy(err, cb) {
}
this._destroy(err || null, (err) => {
process.nextTick(emitCloseNT, this);
if (!cb && err) {
process.nextTick(emitErrorNT, this, err);
if (this._writableState) {
@ -43,6 +44,14 @@ function destroy(err, cb) {
return this;
}
function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose)
return;
if (self._readableState && !self._readableState.emitClose)
return;
self.emit('close');
}
function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;

View File

@ -232,6 +232,11 @@ function Socket(options) {
options = { fd: options }; // Legacy interface.
else if (options === undefined)
options = {};
else
options = util._extend({}, options);
// For backwards compat do not emit close on destroy.
options.emitClose = false;
stream.Duplex.call(this, options);

View File

@ -25,7 +25,6 @@ const {
ERR_BUFFER_TOO_LARGE,
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_ZLIB_BINDING_CLOSED,
ERR_ZLIB_INITIALIZATION_FAILED
} = require('internal/errors').codes;
const Transform = require('_stream_transform');
@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) {
Zlib.prototype.close = function close(callback) {
_close(this, callback);
process.nextTick(emitCloseNT, this);
this.destroy();
};
Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) {
function processChunk(self, chunk, flushFlag, cb) {
var handle = self._handle;
if (!handle)
return cb(new ERR_ZLIB_BINDING_CLOSED());
assert(false, 'zlib binding closed');
handle.buffer = chunk;
handle.cb = cb;
@ -603,10 +602,6 @@ function _close(engine, callback) {
engine._handle = null;
}
function emitCloseNT(self) {
self.emit('close');
}
// generic zlib
// minimal 2-byte header
function Deflate(opts) {

View File

@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() {
// Test destroy returns this, even on multiple calls when it short-circuits.
assert.strictEqual(conn, conn.destroy().destroy());
conn.on('error', common.expectsError({
code: 'ERR_SOCKET_CLOSED',
message: 'Socket is closed',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed',
type: Error
}));
conn.write(Buffer.from('kaboom'), common.expectsError({
code: 'ERR_SOCKET_CLOSED',
message: 'Socket is closed',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed',
type: Error
}));
server.close();

View File

@ -13,8 +13,9 @@ const { inherits } = require('util');
duplex.resume();
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
duplex.on('end', common.mustNotCall());
duplex.on('finish', common.mustNotCall());
duplex.on('close', common.mustCall());
duplex.destroy();
assert.strictEqual(duplex.destroyed, true);
@ -29,8 +30,8 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
duplex.on('end', common.mustNotCall());
duplex.on('finish', common.mustNotCall());
duplex.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@ -78,6 +79,7 @@ const { inherits } = require('util');
// error is swallowed by the custom _destroy
duplex.on('error', common.mustNotCall('no error event'));
duplex.on('close', common.mustCall());
duplex.destroy(expected);
assert.strictEqual(duplex.destroyed, true);
@ -159,8 +161,8 @@ const { inherits } = require('util');
});
duplex.resume();
duplex.on('finish', common.mustCall());
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustNotCall());
duplex.on('end', common.mustNotCall());
duplex.destroy();
assert.strictEqual(duplex.destroyed, true);

View File

@ -11,7 +11,7 @@ const { inherits } = require('util');
});
read.resume();
read.on('end', common.mustCall());
read.on('close', common.mustCall());
read.destroy();
assert.strictEqual(read.destroyed, true);
@ -25,7 +25,8 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
read.on('end', common.mustCall());
read.on('end', common.mustNotCall('no end event'));
read.on('close', common.mustCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@ -47,6 +48,7 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
read.on('end', common.mustNotCall('no end event'));
read.on('close', common.mustCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@ -70,6 +72,7 @@ const { inherits } = require('util');
// error is swallowed by the custom _destroy
read.on('error', common.mustNotCall('no error event'));
read.on('close', common.mustCall());
read.destroy(expected);
assert.strictEqual(read.destroyed, true);
@ -106,6 +109,7 @@ const { inherits } = require('util');
const fail = common.mustNotCall('no end event');
read.on('end', fail);
read.on('close', common.mustCall());
read.destroy();
@ -170,7 +174,18 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
read.on('close', common.mustCall());
read.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(expected, err);
}));
}
{
const read = new Readable({
read() {}
});
read.destroy();
read.push('hi');
read.on('data', common.mustNotCall());
}

View File

@ -11,9 +11,9 @@ const assert = require('assert');
transform.resume();
transform.on('end', common.mustCall());
transform.on('end', common.mustNotCall());
transform.on('close', common.mustCall());
transform.on('finish', common.mustCall());
transform.on('finish', common.mustNotCall());
transform.destroy();
}
@ -26,8 +26,8 @@ const assert = require('assert');
const expected = new Error('kaboom');
transform.on('end', common.mustCall());
transform.on('finish', common.mustCall());
transform.on('end', common.mustNotCall());
transform.on('finish', common.mustNotCall());
transform.on('close', common.mustCall());
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
@ -49,7 +49,7 @@ const assert = require('assert');
const expected = new Error('kaboom');
transform.on('finish', common.mustNotCall('no finish event'));
transform.on('close', common.mustNotCall('no close event'));
transform.on('close', common.mustCall());
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@ -69,7 +69,7 @@ const assert = require('assert');
transform.resume();
transform.on('end', common.mustNotCall('no end event'));
transform.on('close', common.mustNotCall('no close event'));
transform.on('close', common.mustCall());
transform.on('finish', common.mustNotCall('no finish event'));
// error is swallowed by the custom _destroy
@ -110,7 +110,7 @@ const assert = require('assert');
transform.on('finish', fail);
transform.on('end', fail);
transform.on('close', fail);
transform.on('close', common.mustCall());
transform.destroy();
@ -132,7 +132,7 @@ const assert = require('assert');
cb(expected);
}, 1);
transform.on('close', common.mustNotCall('no close event'));
transform.on('close', common.mustCall());
transform.on('finish', common.mustNotCall('no finish event'));
transform.on('end', common.mustNotCall('no end event'));
transform.on('error', common.mustCall((err) => {

View File

@ -10,7 +10,8 @@ const { inherits } = require('util');
write(chunk, enc, cb) { cb(); }
});
write.on('finish', common.mustCall());
write.on('finish', common.mustNotCall());
write.on('close', common.mustCall());
write.destroy();
assert.strictEqual(write.destroyed, true);
@ -23,7 +24,8 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
write.on('finish', common.mustCall());
write.on('finish', common.mustNotCall());
write.on('close', common.mustCall());
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@ -45,6 +47,7 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
write.on('finish', common.mustNotCall('no finish event'));
write.on('close', common.mustCall());
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@ -65,6 +68,7 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
write.on('finish', common.mustNotCall('no finish event'));
write.on('close', common.mustCall());
// error is swallowed by the custom _destroy
write.on('error', common.mustNotCall('no error event'));
@ -103,6 +107,7 @@ const { inherits } = require('util');
const fail = common.mustNotCall('no finish event');
write.on('finish', fail);
write.on('close', common.mustCall());
write.destroy();
@ -123,6 +128,7 @@ const { inherits } = require('util');
cb(expected);
});
write.on('close', common.mustCall());
write.on('finish', common.mustNotCall('no finish event'));
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
@ -138,6 +144,7 @@ const { inherits } = require('util');
write(chunk, enc, cb) { cb(); }
});
write.on('close', common.mustCall());
write.on('error', common.mustCall());
write.destroy(new Error('kaboom 1'));
@ -155,7 +162,7 @@ const { inherits } = require('util');
assert.strictEqual(write.destroyed, true);
// the internal destroy() mechanism should not be triggered
write.on('finish', common.mustNotCall());
write.on('close', common.mustNotCall());
write.destroy();
}

View File

@ -29,9 +29,9 @@ zlib.gzip('hello', common.mustCall(function(err, out) {
common.expectsError(
() => unzip.write(out),
{
code: 'ERR_ZLIB_BINDING_CLOSED',
code: 'ERR_STREAM_DESTROYED',
type: Error,
message: 'zlib binding closed'
message: 'Cannot call write after a stream was destroyed'
}
);
}));