mirror of
https://github.com/nodejs/node.git
synced 2025-05-13 00:05:45 +00:00
stream: construct
Provide a standardized way of asynchronously creating and initializing resources before performing any work. Refs: https://github.com/nodejs/node/issues/29314 PR-URL: https://github.com/nodejs/node/pull/29656 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
This commit is contained in:
parent
9949a2e1e3
commit
fb8cc72e73
@ -550,8 +550,7 @@ added: v9.3.0
|
|||||||
|
|
||||||
* {number}
|
* {number}
|
||||||
|
|
||||||
Return the value of `highWaterMark` passed when constructing this
|
Return the value of `highWaterMark` passed when creating this `Writable`.
|
||||||
`Writable`.
|
|
||||||
|
|
||||||
##### `writable.writableLength`
|
##### `writable.writableLength`
|
||||||
<!-- YAML
|
<!-- YAML
|
||||||
@ -1193,8 +1192,7 @@ added: v9.3.0
|
|||||||
|
|
||||||
* {number}
|
* {number}
|
||||||
|
|
||||||
Returns the value of `highWaterMark` passed when constructing this
|
Returns the value of `highWaterMark` passed when creating this `Readable`.
|
||||||
`Readable`.
|
|
||||||
|
|
||||||
##### `readable.readableLength`
|
##### `readable.readableLength`
|
||||||
<!-- YAML
|
<!-- YAML
|
||||||
@ -1792,7 +1790,7 @@ expectations.
|
|||||||
added: v1.2.0
|
added: v1.2.0
|
||||||
-->
|
-->
|
||||||
|
|
||||||
For many simple cases, it is possible to construct a stream without relying on
|
For many simple cases, it is possible to create a stream without relying on
|
||||||
inheritance. This can be accomplished by directly creating instances of the
|
inheritance. This can be accomplished by directly creating instances of the
|
||||||
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
|
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
|
||||||
objects and passing appropriate methods as constructor options.
|
objects and passing appropriate methods as constructor options.
|
||||||
@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
|
|||||||
const { Writable } = require('stream');
|
const { Writable } = require('stream');
|
||||||
|
|
||||||
const myWritable = new Writable({
|
const myWritable = new Writable({
|
||||||
|
construct(callback) {
|
||||||
|
// Initialize state and load resources...
|
||||||
|
},
|
||||||
write(chunk, encoding, callback) {
|
write(chunk, encoding, callback) {
|
||||||
// ...
|
// ...
|
||||||
|
},
|
||||||
|
destroy() {
|
||||||
|
// Free resources...
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
```
|
```
|
||||||
@ -1861,6 +1865,8 @@ changes:
|
|||||||
[`stream._destroy()`][writable-_destroy] method.
|
[`stream._destroy()`][writable-_destroy] method.
|
||||||
* `final` {Function} Implementation for the
|
* `final` {Function} Implementation for the
|
||||||
[`stream._final()`][stream-_final] method.
|
[`stream._final()`][stream-_final] method.
|
||||||
|
* `construct` {Function} Implementation for the
|
||||||
|
[`stream._construct()`][writable-_construct] method.
|
||||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||||
`.destroy()` on itself after ending. **Default:** `true`.
|
`.destroy()` on itself after ending. **Default:** `true`.
|
||||||
|
|
||||||
@ -1906,6 +1912,56 @@ const myWritable = new Writable({
|
|||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### `writable._construct(callback)`
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
* `callback` {Function} Call this function (optionally with an error
|
||||||
|
argument) when the stream has finished initializing.
|
||||||
|
|
||||||
|
The `_construct()` method MUST NOT be called directly. It may be implemented
|
||||||
|
by child classes, and if so, will be called by the internal `Writable`
|
||||||
|
class methods only.
|
||||||
|
|
||||||
|
This optional function will be called in a tick after the stream constructor
|
||||||
|
has returned, delaying any `_write`, `_final` and `_destroy` calls until
|
||||||
|
`callback` is called. This is useful to initialize state or asynchronously
|
||||||
|
initialize resources before the stream can be used.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { Writable } = require('stream');
|
||||||
|
const fs = require('fs');
|
||||||
|
|
||||||
|
class WriteStream extends Writable {
|
||||||
|
constructor(filename) {
|
||||||
|
super();
|
||||||
|
this.filename = filename;
|
||||||
|
this.fd = fd;
|
||||||
|
}
|
||||||
|
_construct(callback) {
|
||||||
|
fs.open(this.filename, (fd, err) => {
|
||||||
|
if (err) {
|
||||||
|
callback(err);
|
||||||
|
} else {
|
||||||
|
this.fd = fd;
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_write(chunk, encoding, callback) {
|
||||||
|
fs.write(this.fd, chunk, callback);
|
||||||
|
}
|
||||||
|
_destroy(err, callback) {
|
||||||
|
if (this.fd) {
|
||||||
|
fs.close(this.fd, (er) => callback(er || err));
|
||||||
|
} else {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
#### `writable._write(chunk, encoding, callback)`
|
#### `writable._write(chunk, encoding, callback)`
|
||||||
<!-- YAML
|
<!-- YAML
|
||||||
changes:
|
changes:
|
||||||
@ -2130,6 +2186,8 @@ changes:
|
|||||||
method.
|
method.
|
||||||
* `destroy` {Function} Implementation for the
|
* `destroy` {Function} Implementation for the
|
||||||
[`stream._destroy()`][readable-_destroy] method.
|
[`stream._destroy()`][readable-_destroy] method.
|
||||||
|
* `construct` {Function} Implementation for the
|
||||||
|
[`stream._construct()`][readable-_construct] method.
|
||||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||||
`.destroy()` on itself after ending. **Default:** `true`.
|
`.destroy()` on itself after ending. **Default:** `true`.
|
||||||
|
|
||||||
@ -2172,6 +2230,63 @@ const myReadable = new Readable({
|
|||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### `readable._construct(callback)`
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
* `callback` {Function} Call this function (optionally with an error
|
||||||
|
argument) when the stream has finished initializing.
|
||||||
|
|
||||||
|
The `_construct()` method MUST NOT be called directly. It may be implemented
|
||||||
|
by child classes, and if so, will be called by the internal `Readable`
|
||||||
|
class methods only.
|
||||||
|
|
||||||
|
This optional function will be called by the stream constructor,
|
||||||
|
delaying any `_read` and `_destroy` calls until `callback` is called. This is
|
||||||
|
useful to initialize state or asynchronously initialize resources before the
|
||||||
|
stream can be used.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
const fs = require('fs');
|
||||||
|
|
||||||
|
class ReadStream extends Readable {
|
||||||
|
constructor(filename) {
|
||||||
|
super();
|
||||||
|
this.filename = filename;
|
||||||
|
this.fd = null;
|
||||||
|
}
|
||||||
|
_construct(callback) {
|
||||||
|
fs.open(this.filename, (fd, err) => {
|
||||||
|
if (err) {
|
||||||
|
callback(err);
|
||||||
|
} else {
|
||||||
|
this.fd = fd;
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_read(n) {
|
||||||
|
const buf = Buffer.alloc(n);
|
||||||
|
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
|
||||||
|
if (err) {
|
||||||
|
this.destroy(err);
|
||||||
|
} else {
|
||||||
|
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_destroy(err, callback) {
|
||||||
|
if (this.fd) {
|
||||||
|
fs.close(this.fd, (er) => callback(er || err));
|
||||||
|
} else {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
#### `readable._read(size)`
|
#### `readable._read(size)`
|
||||||
<!-- YAML
|
<!-- YAML
|
||||||
added: v0.9.4
|
added: v0.9.4
|
||||||
@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
|
|||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
When using pipeline:
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { Transform, pipeline } = require('stream');
|
||||||
|
const fs = require('fs');
|
||||||
|
|
||||||
|
pipeline(
|
||||||
|
fs.createReadStream('object.json')
|
||||||
|
.setEncoding('utf-8'),
|
||||||
|
new Transform({
|
||||||
|
decodeStrings: false, // Accept string input rather than Buffers
|
||||||
|
construct(callback) {
|
||||||
|
this.data = '';
|
||||||
|
callback();
|
||||||
|
},
|
||||||
|
transform(chunk, encoding, callback) {
|
||||||
|
this.data += chunk;
|
||||||
|
callback();
|
||||||
|
},
|
||||||
|
flush(callback) {
|
||||||
|
try {
|
||||||
|
// Make sure is valid json.
|
||||||
|
JSON.parse(this.data);
|
||||||
|
this.push(this.data);
|
||||||
|
} catch (err) {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
fs.createWriteStream('valid-object.json'),
|
||||||
|
(err) => {
|
||||||
|
if (err) {
|
||||||
|
console.error('failed', err);
|
||||||
|
} else {
|
||||||
|
console.log('completed');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
#### An Example Duplex Stream
|
#### An Example Duplex Stream
|
||||||
|
|
||||||
The following illustrates a simple example of a `Duplex` stream that wraps a
|
The following illustrates a simple example of a `Duplex` stream that wraps a
|
||||||
@ -2706,8 +2861,8 @@ unhandled post-destroy errors.
|
|||||||
|
|
||||||
#### Creating Readable Streams with Async Generators
|
#### Creating Readable Streams with Async Generators
|
||||||
|
|
||||||
We can construct a Node.js Readable Stream from an asynchronous generator
|
A Node.js Readable Stream can be created from an asynchronous generator using
|
||||||
using the `Readable.from()` utility method:
|
the `Readable.from()` utility method:
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const { Readable } = require('stream');
|
const { Readable } = require('stream');
|
||||||
@ -2960,6 +3115,7 @@ contain multi-byte characters.
|
|||||||
[http-incoming-message]: http.html#http_class_http_incomingmessage
|
[http-incoming-message]: http.html#http_class_http_incomingmessage
|
||||||
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
|
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
|
||||||
[object-mode]: #stream_object_mode
|
[object-mode]: #stream_object_mode
|
||||||
|
[readable-_construct]: #stream_readable_construct_callback
|
||||||
[readable-_destroy]: #stream_readable_destroy_err_callback
|
[readable-_destroy]: #stream_readable_destroy_err_callback
|
||||||
[readable-destroy]: #stream_readable_destroy_error
|
[readable-destroy]: #stream_readable_destroy_error
|
||||||
[stream-_final]: #stream_writable_final_callback
|
[stream-_final]: #stream_writable_final_callback
|
||||||
@ -2976,6 +3132,7 @@ contain multi-byte characters.
|
|||||||
[stream-uncork]: #stream_writable_uncork
|
[stream-uncork]: #stream_writable_uncork
|
||||||
[stream-write]: #stream_writable_write_chunk_encoding_callback
|
[stream-write]: #stream_writable_write_chunk_encoding_callback
|
||||||
[Stream Three States]: #stream_three_states
|
[Stream Three States]: #stream_three_states
|
||||||
|
[writable-_construct]: #stream_writable_construct_callback
|
||||||
[writable-_destroy]: #stream_writable_destroy_err_callback
|
[writable-_destroy]: #stream_writable_destroy_err_callback
|
||||||
[writable-destroy]: #stream_writable_destroy_error
|
[writable-destroy]: #stream_writable_destroy_error
|
||||||
[writable-new]: #stream_constructor_new_stream_writable_options
|
[writable-new]: #stream_constructor_new_stream_writable_options
|
||||||
|
@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
|
|||||||
this.endEmitted = false;
|
this.endEmitted = false;
|
||||||
this.reading = false;
|
this.reading = false;
|
||||||
|
|
||||||
|
// Stream is still being constructed and cannot be
|
||||||
|
// destroyed until construction finished or failed.
|
||||||
|
// Async construction is opt in, therefore we start as
|
||||||
|
// constructed.
|
||||||
|
this.constructed = true;
|
||||||
|
|
||||||
// A flag to be able to tell if the event 'readable'/'data' is emitted
|
// A flag to be able to tell if the event 'readable'/'data' is emitted
|
||||||
// immediately, or on a later tick. We set this to true at first, because
|
// immediately, or on a later tick. We set this to true at first, because
|
||||||
// any actions that shouldn't happen until "later" should generally also
|
// any actions that shouldn't happen until "later" should generally also
|
||||||
@ -197,9 +203,16 @@ function Readable(options) {
|
|||||||
|
|
||||||
if (typeof options.destroy === 'function')
|
if (typeof options.destroy === 'function')
|
||||||
this._destroy = options.destroy;
|
this._destroy = options.destroy;
|
||||||
|
|
||||||
|
if (typeof options.construct === 'function')
|
||||||
|
this._construct = options.construct;
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream.call(this, options);
|
Stream.call(this, options);
|
||||||
|
|
||||||
|
destroyImpl.construct(this, () => {
|
||||||
|
maybeReadMore(this, this._readableState);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Readable.prototype.destroy = destroyImpl.destroy;
|
Readable.prototype.destroy = destroyImpl.destroy;
|
||||||
@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// However, if we've ended, then there's no point, if we're already
|
// However, if we've ended, then there's no point, if we're already
|
||||||
// reading, then it's unnecessary, and if we're destroyed or errored,
|
// reading, then it's unnecessary, if we're constructing we have to wait,
|
||||||
// then it's not allowed.
|
// and if we're destroyed or errored, then it's not allowed,
|
||||||
if (state.ended || state.reading || state.destroyed || state.errored) {
|
if (state.ended || state.reading || state.destroyed || state.errored ||
|
||||||
|
!state.constructed) {
|
||||||
doRead = false;
|
doRead = false;
|
||||||
debug('reading or ended', doRead);
|
debug('reading, ended or constructing', doRead);
|
||||||
} else if (doRead) {
|
} else if (doRead) {
|
||||||
debug('do read');
|
debug('do read');
|
||||||
state.reading = true;
|
state.reading = true;
|
||||||
@ -587,7 +601,7 @@ function emitReadable_(stream) {
|
|||||||
// However, if we're not ended, or reading, and the length < hwm,
|
// However, if we're not ended, or reading, and the length < hwm,
|
||||||
// then go ahead and try to read some more preemptively.
|
// then go ahead and try to read some more preemptively.
|
||||||
function maybeReadMore(stream, state) {
|
function maybeReadMore(stream, state) {
|
||||||
if (!state.readingMore) {
|
if (!state.readingMore && state.constructed) {
|
||||||
state.readingMore = true;
|
state.readingMore = true;
|
||||||
process.nextTick(maybeReadMore_, stream, state);
|
process.nextTick(maybeReadMore_, stream, state);
|
||||||
}
|
}
|
||||||
|
@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
|
|||||||
// this must be 0 before 'finish' can be emitted.
|
// this must be 0 before 'finish' can be emitted.
|
||||||
this.pendingcb = 0;
|
this.pendingcb = 0;
|
||||||
|
|
||||||
|
// Stream is still being constructed and cannot be
|
||||||
|
// destroyed until construction finished or failed.
|
||||||
|
// Async construction is opt in, therefore we start as
|
||||||
|
// constructed.
|
||||||
|
this.constructed = true;
|
||||||
|
|
||||||
// Emit prefinish if the only thing we're waiting for is _write cbs
|
// Emit prefinish if the only thing we're waiting for is _write cbs
|
||||||
// This is relevant for synchronous Transform streams.
|
// This is relevant for synchronous Transform streams.
|
||||||
this.prefinished = false;
|
this.prefinished = false;
|
||||||
@ -249,9 +255,22 @@ function Writable(options) {
|
|||||||
|
|
||||||
if (typeof options.final === 'function')
|
if (typeof options.final === 'function')
|
||||||
this._final = options.final;
|
this._final = options.final;
|
||||||
|
|
||||||
|
if (typeof options.construct === 'function')
|
||||||
|
this._construct = options.construct;
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream.call(this, options);
|
Stream.call(this, options);
|
||||||
|
|
||||||
|
destroyImpl.construct(this, () => {
|
||||||
|
const state = this._writableState;
|
||||||
|
|
||||||
|
if (!state.writing) {
|
||||||
|
clearBuffer(this, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
finishMaybe(this, state);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise people can pipe Writable streams, which is just wrong.
|
// Otherwise people can pipe Writable streams, which is just wrong.
|
||||||
@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
|
|||||||
|
|
||||||
state.length += len;
|
state.length += len;
|
||||||
|
|
||||||
if (state.writing || state.corked || state.errored) {
|
if (state.writing || state.corked || state.errored || !state.constructed) {
|
||||||
state.buffered.push({ chunk, encoding, callback });
|
state.buffered.push({ chunk, encoding, callback });
|
||||||
if (state.allBuffers && encoding !== 'buffer') {
|
if (state.allBuffers && encoding !== 'buffer') {
|
||||||
state.allBuffers = false;
|
state.allBuffers = false;
|
||||||
@ -492,7 +511,10 @@ function errorBuffer(state, err) {
|
|||||||
|
|
||||||
// If there's something in the buffer waiting, then process it.
|
// If there's something in the buffer waiting, then process it.
|
||||||
function clearBuffer(stream, state) {
|
function clearBuffer(stream, state) {
|
||||||
if (state.corked || state.bufferProcessing || state.destroyed) {
|
if (state.corked ||
|
||||||
|
state.bufferProcessing ||
|
||||||
|
state.destroyed ||
|
||||||
|
!state.constructed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
|||||||
|
|
||||||
function needFinish(state) {
|
function needFinish(state) {
|
||||||
return (state.ending &&
|
return (state.ending &&
|
||||||
|
state.constructed &&
|
||||||
state.length === 0 &&
|
state.length === 0 &&
|
||||||
!state.errored &&
|
!state.errored &&
|
||||||
state.buffered.length === 0 &&
|
state.buffered.length === 0 &&
|
||||||
|
@ -1,10 +1,20 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
|
const {
|
||||||
|
ERR_MULTIPLE_CALLBACK
|
||||||
|
} = require('internal/errors').codes;
|
||||||
|
const { Symbol } = primordials;
|
||||||
|
|
||||||
|
const kDestroy = Symbol('kDestroy');
|
||||||
|
const kConstruct = Symbol('kConstruct');
|
||||||
|
|
||||||
// Backwards compat. cb() is undocumented and unused in core but
|
// Backwards compat. cb() is undocumented and unused in core but
|
||||||
// unfortunately might be used by modules.
|
// unfortunately might be used by modules.
|
||||||
function destroy(err, cb) {
|
function destroy(err, cb) {
|
||||||
const r = this._readableState;
|
const r = this._readableState;
|
||||||
const w = this._writableState;
|
const w = this._writableState;
|
||||||
|
// With duplex streams we use the writable side for state.
|
||||||
|
const s = w || r;
|
||||||
|
|
||||||
if ((w && w.destroyed) || (r && r.destroyed)) {
|
if ((w && w.destroyed) || (r && r.destroyed)) {
|
||||||
if (typeof cb === 'function') {
|
if (typeof cb === 'function') {
|
||||||
@ -33,7 +43,23 @@ function destroy(err, cb) {
|
|||||||
r.destroyed = true;
|
r.destroyed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
this._destroy(err || null, (err) => {
|
// If still constructing then defer calling _destroy.
|
||||||
|
if (!s.constructed) {
|
||||||
|
this.once(kDestroy, function(er) {
|
||||||
|
_destroy(this, err || er, cb);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
_destroy(this, err, cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
function _destroy(self, err, cb) {
|
||||||
|
self._destroy(err || null, (err) => {
|
||||||
|
const r = self._readableState;
|
||||||
|
const w = self._writableState;
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
if (w) {
|
if (w) {
|
||||||
w.errored = true;
|
w.errored = true;
|
||||||
@ -55,13 +81,11 @@ function destroy(err, cb) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
process.nextTick(emitErrorCloseNT, this, err);
|
process.nextTick(emitErrorCloseNT, self, err);
|
||||||
} else {
|
} else {
|
||||||
process.nextTick(emitCloseNT, this);
|
process.nextTick(emitCloseNT, self);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function emitErrorCloseNT(self, err) {
|
function emitErrorCloseNT(self, err) {
|
||||||
@ -108,6 +132,7 @@ function undestroy() {
|
|||||||
const w = this._writableState;
|
const w = this._writableState;
|
||||||
|
|
||||||
if (r) {
|
if (r) {
|
||||||
|
r.constructed = true;
|
||||||
r.closed = false;
|
r.closed = false;
|
||||||
r.closeEmitted = false;
|
r.closeEmitted = false;
|
||||||
r.destroyed = false;
|
r.destroyed = false;
|
||||||
@ -119,6 +144,7 @@ function undestroy() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (w) {
|
if (w) {
|
||||||
|
w.constructed = true;
|
||||||
w.destroyed = false;
|
w.destroyed = false;
|
||||||
w.closed = false;
|
w.closed = false;
|
||||||
w.closeEmitted = false;
|
w.closeEmitted = false;
|
||||||
@ -155,7 +181,6 @@ function errorOrDestroy(stream, err, sync) {
|
|||||||
if (r) {
|
if (r) {
|
||||||
r.errored = true;
|
r.errored = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sync) {
|
if (sync) {
|
||||||
process.nextTick(emitErrorNT, stream, err);
|
process.nextTick(emitErrorNT, stream, err);
|
||||||
} else {
|
} else {
|
||||||
@ -164,6 +189,66 @@ function errorOrDestroy(stream, err, sync) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function construct(stream, cb) {
|
||||||
|
if (typeof stream._construct !== 'function') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.once(kConstruct, cb);
|
||||||
|
|
||||||
|
if (stream.listenerCount(kConstruct) > 1) {
|
||||||
|
// Duplex
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const r = stream._readableState;
|
||||||
|
const w = stream._writableState;
|
||||||
|
|
||||||
|
if (r) {
|
||||||
|
r.constructed = false;
|
||||||
|
}
|
||||||
|
if (w) {
|
||||||
|
w.constructed = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
process.nextTick(constructNT, stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
function constructNT(stream) {
|
||||||
|
const r = stream._readableState;
|
||||||
|
const w = stream._writableState;
|
||||||
|
// With duplex streams we use the writable side for state.
|
||||||
|
const s = w || r;
|
||||||
|
|
||||||
|
let called = false;
|
||||||
|
stream._construct((err) => {
|
||||||
|
if (r) {
|
||||||
|
r.constructed = true;
|
||||||
|
}
|
||||||
|
if (w) {
|
||||||
|
w.constructed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (called) {
|
||||||
|
err = new ERR_MULTIPLE_CALLBACK();
|
||||||
|
} else {
|
||||||
|
called = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (s.destroyed) {
|
||||||
|
stream.emit(kDestroy, err);
|
||||||
|
} else if (err) {
|
||||||
|
errorOrDestroy(stream, err, true);
|
||||||
|
} else {
|
||||||
|
process.nextTick(emitConstructNT, stream);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function emitConstructNT(stream) {
|
||||||
|
stream.emit(kConstruct);
|
||||||
|
}
|
||||||
|
|
||||||
function isRequest(stream) {
|
function isRequest(stream) {
|
||||||
return stream && stream.setHeader && typeof stream.abort === 'function';
|
return stream && stream.setHeader && typeof stream.abort === 'function';
|
||||||
}
|
}
|
||||||
@ -177,6 +262,7 @@ function destroyer(stream, err) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
construct,
|
||||||
destroyer,
|
destroyer,
|
||||||
destroy,
|
destroy,
|
||||||
undestroy,
|
undestroy,
|
||||||
|
244
test/parallel/test-stream-construct.js
Normal file
244
test/parallel/test-stream-construct.js
Normal file
@ -0,0 +1,244 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
const { Writable, Readable, Duplex } = require('stream');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
{
|
||||||
|
// Multiple callback.
|
||||||
|
new Writable({
|
||||||
|
construct: common.mustCall((callback) => {
|
||||||
|
callback();
|
||||||
|
callback();
|
||||||
|
})
|
||||||
|
}).on('error', common.expectsError({
|
||||||
|
name: 'Error',
|
||||||
|
code: 'ERR_MULTIPLE_CALLBACK'
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Multiple callback.
|
||||||
|
new Readable({
|
||||||
|
construct: common.mustCall((callback) => {
|
||||||
|
callback();
|
||||||
|
callback();
|
||||||
|
})
|
||||||
|
}).on('error', common.expectsError({
|
||||||
|
name: 'Error',
|
||||||
|
code: 'ERR_MULTIPLE_CALLBACK'
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Synchronous error.
|
||||||
|
|
||||||
|
new Writable({
|
||||||
|
construct: common.mustCall((callback) => {
|
||||||
|
callback(new Error('test'));
|
||||||
|
})
|
||||||
|
}).on('error', common.expectsError({
|
||||||
|
name: 'Error',
|
||||||
|
message: 'test'
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Synchronous error.
|
||||||
|
|
||||||
|
new Readable({
|
||||||
|
construct: common.mustCall((callback) => {
|
||||||
|
callback(new Error('test'));
|
||||||
|
})
|
||||||
|
}).on('error', common.expectsError({
|
||||||
|
name: 'Error',
|
||||||
|
message: 'test'
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Asynchronous error.
|
||||||
|
|
||||||
|
new Writable({
|
||||||
|
construct: common.mustCall((callback) => {
|
||||||
|
process.nextTick(callback, new Error('test'));
|
||||||
|
})
|
||||||
|
}).on('error', common.expectsError({
|
||||||
|
name: 'Error',
|
||||||
|
message: 'test'
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Asynchronous error.
|
||||||
|
|
||||||
|
new Readable({
|
||||||
|
construct: common.mustCall((callback) => {
|
||||||
|
process.nextTick(callback, new Error('test'));
|
||||||
|
})
|
||||||
|
}).on('error', common.expectsError({
|
||||||
|
name: 'Error',
|
||||||
|
message: 'test'
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
function testDestroy(factory) {
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const s = factory({
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
s.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
s.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const s = factory({
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
s.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
s.destroy(null, () => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const s = factory({
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
s.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
s.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const s = factory({
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
s.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
s.on('error', common.mustCall((err) => {
|
||||||
|
assert.strictEqual(err.message, 'kaboom');
|
||||||
|
}));
|
||||||
|
s.destroy(new Error('kaboom'), (err) => {
|
||||||
|
assert.strictEqual(err.message, 'kaboom');
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const s = factory({
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
s.on('error', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
s.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
s.destroy(new Error());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
testDestroy((opts) => new Readable({
|
||||||
|
read: common.mustNotCall(),
|
||||||
|
...opts
|
||||||
|
}));
|
||||||
|
testDestroy((opts) => new Writable({
|
||||||
|
write: common.mustNotCall(),
|
||||||
|
final: common.mustNotCall(),
|
||||||
|
...opts
|
||||||
|
}));
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const r = new Readable({
|
||||||
|
autoDestroy: true,
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
}),
|
||||||
|
read: common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
r.push(null);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
r.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
r.on('data', common.mustNotCall());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const w = new Writable({
|
||||||
|
autoDestroy: true,
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
}),
|
||||||
|
write: common.mustCall((chunk, encoding, cb) => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
process.nextTick(cb);
|
||||||
|
}),
|
||||||
|
final: common.mustCall((cb) => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
w.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
w.end('data');
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let constructed = false;
|
||||||
|
const w = new Writable({
|
||||||
|
autoDestroy: true,
|
||||||
|
construct: common.mustCall((cb) => {
|
||||||
|
constructed = true;
|
||||||
|
process.nextTick(cb);
|
||||||
|
}),
|
||||||
|
write: common.mustNotCall(),
|
||||||
|
final: common.mustCall((cb) => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
process.nextTick(cb);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
w.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(constructed, true);
|
||||||
|
}));
|
||||||
|
w.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
new Duplex({
|
||||||
|
construct: common.mustCall()
|
||||||
|
});
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user