mirror of
https://github.com/nodejs/node.git
synced 2025-05-12 20:54:38 +00:00
stream: construct
Provide a standardized way of asynchronously creating and initializing resources before performing any work. Refs: https://github.com/nodejs/node/issues/29314 PR-URL: https://github.com/nodejs/node/pull/29656 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
This commit is contained in:
parent
9949a2e1e3
commit
fb8cc72e73
@ -550,8 +550,7 @@ added: v9.3.0
|
||||
|
||||
* {number}
|
||||
|
||||
Return the value of `highWaterMark` passed when constructing this
|
||||
`Writable`.
|
||||
Return the value of `highWaterMark` passed when creating this `Writable`.
|
||||
|
||||
##### `writable.writableLength`
|
||||
<!-- YAML
|
||||
@ -1193,8 +1192,7 @@ added: v9.3.0
|
||||
|
||||
* {number}
|
||||
|
||||
Returns the value of `highWaterMark` passed when constructing this
|
||||
`Readable`.
|
||||
Returns the value of `highWaterMark` passed when creating this `Readable`.
|
||||
|
||||
##### `readable.readableLength`
|
||||
<!-- YAML
|
||||
@ -1792,7 +1790,7 @@ expectations.
|
||||
added: v1.2.0
|
||||
-->
|
||||
|
||||
For many simple cases, it is possible to construct a stream without relying on
|
||||
For many simple cases, it is possible to create a stream without relying on
|
||||
inheritance. This can be accomplished by directly creating instances of the
|
||||
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
|
||||
objects and passing appropriate methods as constructor options.
|
||||
@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
|
||||
const { Writable } = require('stream');
|
||||
|
||||
const myWritable = new Writable({
|
||||
construct(callback) {
|
||||
// Initialize state and load resources...
|
||||
},
|
||||
write(chunk, encoding, callback) {
|
||||
// ...
|
||||
},
|
||||
destroy() {
|
||||
// Free resources...
|
||||
}
|
||||
});
|
||||
```
|
||||
@ -1861,6 +1865,8 @@ changes:
|
||||
[`stream._destroy()`][writable-_destroy] method.
|
||||
* `final` {Function} Implementation for the
|
||||
[`stream._final()`][stream-_final] method.
|
||||
* `construct` {Function} Implementation for the
|
||||
[`stream._construct()`][writable-_construct] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `true`.
|
||||
|
||||
@ -1906,6 +1912,56 @@ const myWritable = new Writable({
|
||||
});
|
||||
```
|
||||
|
||||
#### `writable._construct(callback)`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
* `callback` {Function} Call this function (optionally with an error
|
||||
argument) when the stream has finished initializing.
|
||||
|
||||
The `_construct()` method MUST NOT be called directly. It may be implemented
|
||||
by child classes, and if so, will be called by the internal `Writable`
|
||||
class methods only.
|
||||
|
||||
This optional function will be called in a tick after the stream constructor
|
||||
has returned, delaying any `_write`, `_final` and `_destroy` calls until
|
||||
`callback` is called. This is useful to initialize state or asynchronously
|
||||
initialize resources before the stream can be used.
|
||||
|
||||
```js
|
||||
const { Writable } = require('stream');
|
||||
const fs = require('fs');
|
||||
|
||||
class WriteStream extends Writable {
|
||||
constructor(filename) {
|
||||
super();
|
||||
this.filename = filename;
|
||||
this.fd = fd;
|
||||
}
|
||||
_construct(callback) {
|
||||
fs.open(this.filename, (fd, err) => {
|
||||
if (err) {
|
||||
callback(err);
|
||||
} else {
|
||||
this.fd = fd;
|
||||
callback();
|
||||
}
|
||||
});
|
||||
}
|
||||
_write(chunk, encoding, callback) {
|
||||
fs.write(this.fd, chunk, callback);
|
||||
}
|
||||
_destroy(err, callback) {
|
||||
if (this.fd) {
|
||||
fs.close(this.fd, (er) => callback(er || err));
|
||||
} else {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### `writable._write(chunk, encoding, callback)`
|
||||
<!-- YAML
|
||||
changes:
|
||||
@ -2130,6 +2186,8 @@ changes:
|
||||
method.
|
||||
* `destroy` {Function} Implementation for the
|
||||
[`stream._destroy()`][readable-_destroy] method.
|
||||
* `construct` {Function} Implementation for the
|
||||
[`stream._construct()`][readable-_construct] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `true`.
|
||||
|
||||
@ -2172,6 +2230,63 @@ const myReadable = new Readable({
|
||||
});
|
||||
```
|
||||
|
||||
#### `readable._construct(callback)`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
* `callback` {Function} Call this function (optionally with an error
|
||||
argument) when the stream has finished initializing.
|
||||
|
||||
The `_construct()` method MUST NOT be called directly. It may be implemented
|
||||
by child classes, and if so, will be called by the internal `Readable`
|
||||
class methods only.
|
||||
|
||||
This optional function will be called by the stream constructor,
|
||||
delaying any `_read` and `_destroy` calls until `callback` is called. This is
|
||||
useful to initialize state or asynchronously initialize resources before the
|
||||
stream can be used.
|
||||
|
||||
```js
|
||||
const { Readable } = require('stream');
|
||||
const fs = require('fs');
|
||||
|
||||
class ReadStream extends Readable {
|
||||
constructor(filename) {
|
||||
super();
|
||||
this.filename = filename;
|
||||
this.fd = null;
|
||||
}
|
||||
_construct(callback) {
|
||||
fs.open(this.filename, (fd, err) => {
|
||||
if (err) {
|
||||
callback(err);
|
||||
} else {
|
||||
this.fd = fd;
|
||||
callback();
|
||||
}
|
||||
});
|
||||
}
|
||||
_read(n) {
|
||||
const buf = Buffer.alloc(n);
|
||||
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
|
||||
if (err) {
|
||||
this.destroy(err);
|
||||
} else {
|
||||
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
|
||||
}
|
||||
});
|
||||
}
|
||||
_destroy(err, callback) {
|
||||
if (this.fd) {
|
||||
fs.close(this.fd, (er) => callback(er || err));
|
||||
} else {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### `readable._read(size)`
|
||||
<!-- YAML
|
||||
added: v0.9.4
|
||||
@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
|
||||
});
|
||||
```
|
||||
|
||||
When using pipeline:
|
||||
|
||||
```js
|
||||
const { Transform, pipeline } = require('stream');
|
||||
const fs = require('fs');
|
||||
|
||||
pipeline(
|
||||
fs.createReadStream('object.json')
|
||||
.setEncoding('utf-8'),
|
||||
new Transform({
|
||||
decodeStrings: false, // Accept string input rather than Buffers
|
||||
construct(callback) {
|
||||
this.data = '';
|
||||
callback();
|
||||
},
|
||||
transform(chunk, encoding, callback) {
|
||||
this.data += chunk;
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
try {
|
||||
// Make sure is valid json.
|
||||
JSON.parse(this.data);
|
||||
this.push(this.data);
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
}),
|
||||
fs.createWriteStream('valid-object.json'),
|
||||
(err) => {
|
||||
if (err) {
|
||||
console.error('failed', err);
|
||||
} else {
|
||||
console.log('completed');
|
||||
}
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
#### An Example Duplex Stream
|
||||
|
||||
The following illustrates a simple example of a `Duplex` stream that wraps a
|
||||
@ -2706,8 +2861,8 @@ unhandled post-destroy errors.
|
||||
|
||||
#### Creating Readable Streams with Async Generators
|
||||
|
||||
We can construct a Node.js Readable Stream from an asynchronous generator
|
||||
using the `Readable.from()` utility method:
|
||||
A Node.js Readable Stream can be created from an asynchronous generator using
|
||||
the `Readable.from()` utility method:
|
||||
|
||||
```js
|
||||
const { Readable } = require('stream');
|
||||
@ -2960,6 +3115,7 @@ contain multi-byte characters.
|
||||
[http-incoming-message]: http.html#http_class_http_incomingmessage
|
||||
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
|
||||
[object-mode]: #stream_object_mode
|
||||
[readable-_construct]: #stream_readable_construct_callback
|
||||
[readable-_destroy]: #stream_readable_destroy_err_callback
|
||||
[readable-destroy]: #stream_readable_destroy_error
|
||||
[stream-_final]: #stream_writable_final_callback
|
||||
@ -2976,6 +3132,7 @@ contain multi-byte characters.
|
||||
[stream-uncork]: #stream_writable_uncork
|
||||
[stream-write]: #stream_writable_write_chunk_encoding_callback
|
||||
[Stream Three States]: #stream_three_states
|
||||
[writable-_construct]: #stream_writable_construct_callback
|
||||
[writable-_destroy]: #stream_writable_destroy_err_callback
|
||||
[writable-destroy]: #stream_writable_destroy_error
|
||||
[writable-new]: #stream_constructor_new_stream_writable_options
|
||||
|
@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
|
||||
this.endEmitted = false;
|
||||
this.reading = false;
|
||||
|
||||
// Stream is still being constructed and cannot be
|
||||
// destroyed until construction finished or failed.
|
||||
// Async construction is opt in, therefore we start as
|
||||
// constructed.
|
||||
this.constructed = true;
|
||||
|
||||
// A flag to be able to tell if the event 'readable'/'data' is emitted
|
||||
// immediately, or on a later tick. We set this to true at first, because
|
||||
// any actions that shouldn't happen until "later" should generally also
|
||||
@ -197,9 +203,16 @@ function Readable(options) {
|
||||
|
||||
if (typeof options.destroy === 'function')
|
||||
this._destroy = options.destroy;
|
||||
|
||||
if (typeof options.construct === 'function')
|
||||
this._construct = options.construct;
|
||||
}
|
||||
|
||||
Stream.call(this, options);
|
||||
|
||||
destroyImpl.construct(this, () => {
|
||||
maybeReadMore(this, this._readableState);
|
||||
});
|
||||
}
|
||||
|
||||
Readable.prototype.destroy = destroyImpl.destroy;
|
||||
@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
|
||||
}
|
||||
|
||||
// However, if we've ended, then there's no point, if we're already
|
||||
// reading, then it's unnecessary, and if we're destroyed or errored,
|
||||
// then it's not allowed.
|
||||
if (state.ended || state.reading || state.destroyed || state.errored) {
|
||||
// reading, then it's unnecessary, if we're constructing we have to wait,
|
||||
// and if we're destroyed or errored, then it's not allowed,
|
||||
if (state.ended || state.reading || state.destroyed || state.errored ||
|
||||
!state.constructed) {
|
||||
doRead = false;
|
||||
debug('reading or ended', doRead);
|
||||
debug('reading, ended or constructing', doRead);
|
||||
} else if (doRead) {
|
||||
debug('do read');
|
||||
state.reading = true;
|
||||
@ -587,7 +601,7 @@ function emitReadable_(stream) {
|
||||
// However, if we're not ended, or reading, and the length < hwm,
|
||||
// then go ahead and try to read some more preemptively.
|
||||
function maybeReadMore(stream, state) {
|
||||
if (!state.readingMore) {
|
||||
if (!state.readingMore && state.constructed) {
|
||||
state.readingMore = true;
|
||||
process.nextTick(maybeReadMore_, stream, state);
|
||||
}
|
||||
|
@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
|
||||
// this must be 0 before 'finish' can be emitted.
|
||||
this.pendingcb = 0;
|
||||
|
||||
// Stream is still being constructed and cannot be
|
||||
// destroyed until construction finished or failed.
|
||||
// Async construction is opt in, therefore we start as
|
||||
// constructed.
|
||||
this.constructed = true;
|
||||
|
||||
// Emit prefinish if the only thing we're waiting for is _write cbs
|
||||
// This is relevant for synchronous Transform streams.
|
||||
this.prefinished = false;
|
||||
@ -249,9 +255,22 @@ function Writable(options) {
|
||||
|
||||
if (typeof options.final === 'function')
|
||||
this._final = options.final;
|
||||
|
||||
if (typeof options.construct === 'function')
|
||||
this._construct = options.construct;
|
||||
}
|
||||
|
||||
Stream.call(this, options);
|
||||
|
||||
destroyImpl.construct(this, () => {
|
||||
const state = this._writableState;
|
||||
|
||||
if (!state.writing) {
|
||||
clearBuffer(this, state);
|
||||
}
|
||||
|
||||
finishMaybe(this, state);
|
||||
});
|
||||
}
|
||||
|
||||
// Otherwise people can pipe Writable streams, which is just wrong.
|
||||
@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
|
||||
|
||||
state.length += len;
|
||||
|
||||
if (state.writing || state.corked || state.errored) {
|
||||
if (state.writing || state.corked || state.errored || !state.constructed) {
|
||||
state.buffered.push({ chunk, encoding, callback });
|
||||
if (state.allBuffers && encoding !== 'buffer') {
|
||||
state.allBuffers = false;
|
||||
@ -492,7 +511,10 @@ function errorBuffer(state, err) {
|
||||
|
||||
// If there's something in the buffer waiting, then process it.
|
||||
function clearBuffer(stream, state) {
|
||||
if (state.corked || state.bufferProcessing || state.destroyed) {
|
||||
if (state.corked ||
|
||||
state.bufferProcessing ||
|
||||
state.destroyed ||
|
||||
!state.constructed) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
||||
|
||||
function needFinish(state) {
|
||||
return (state.ending &&
|
||||
state.constructed &&
|
||||
state.length === 0 &&
|
||||
!state.errored &&
|
||||
state.buffered.length === 0 &&
|
||||
|
@ -1,10 +1,20 @@
|
||||
'use strict';
|
||||
|
||||
const {
|
||||
ERR_MULTIPLE_CALLBACK
|
||||
} = require('internal/errors').codes;
|
||||
const { Symbol } = primordials;
|
||||
|
||||
const kDestroy = Symbol('kDestroy');
|
||||
const kConstruct = Symbol('kConstruct');
|
||||
|
||||
// Backwards compat. cb() is undocumented and unused in core but
|
||||
// unfortunately might be used by modules.
|
||||
function destroy(err, cb) {
|
||||
const r = this._readableState;
|
||||
const w = this._writableState;
|
||||
// With duplex streams we use the writable side for state.
|
||||
const s = w || r;
|
||||
|
||||
if ((w && w.destroyed) || (r && r.destroyed)) {
|
||||
if (typeof cb === 'function') {
|
||||
@ -33,7 +43,23 @@ function destroy(err, cb) {
|
||||
r.destroyed = true;
|
||||
}
|
||||
|
||||
this._destroy(err || null, (err) => {
|
||||
// If still constructing then defer calling _destroy.
|
||||
if (!s.constructed) {
|
||||
this.once(kDestroy, function(er) {
|
||||
_destroy(this, err || er, cb);
|
||||
});
|
||||
} else {
|
||||
_destroy(this, err, cb);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
function _destroy(self, err, cb) {
|
||||
self._destroy(err || null, (err) => {
|
||||
const r = self._readableState;
|
||||
const w = self._writableState;
|
||||
|
||||
if (err) {
|
||||
if (w) {
|
||||
w.errored = true;
|
||||
@ -55,13 +81,11 @@ function destroy(err, cb) {
|
||||
}
|
||||
|
||||
if (err) {
|
||||
process.nextTick(emitErrorCloseNT, this, err);
|
||||
process.nextTick(emitErrorCloseNT, self, err);
|
||||
} else {
|
||||
process.nextTick(emitCloseNT, this);
|
||||
process.nextTick(emitCloseNT, self);
|
||||
}
|
||||
});
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
function emitErrorCloseNT(self, err) {
|
||||
@ -108,6 +132,7 @@ function undestroy() {
|
||||
const w = this._writableState;
|
||||
|
||||
if (r) {
|
||||
r.constructed = true;
|
||||
r.closed = false;
|
||||
r.closeEmitted = false;
|
||||
r.destroyed = false;
|
||||
@ -119,6 +144,7 @@ function undestroy() {
|
||||
}
|
||||
|
||||
if (w) {
|
||||
w.constructed = true;
|
||||
w.destroyed = false;
|
||||
w.closed = false;
|
||||
w.closeEmitted = false;
|
||||
@ -155,7 +181,6 @@ function errorOrDestroy(stream, err, sync) {
|
||||
if (r) {
|
||||
r.errored = true;
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
process.nextTick(emitErrorNT, stream, err);
|
||||
} else {
|
||||
@ -164,6 +189,66 @@ function errorOrDestroy(stream, err, sync) {
|
||||
}
|
||||
}
|
||||
|
||||
function construct(stream, cb) {
|
||||
if (typeof stream._construct !== 'function') {
|
||||
return;
|
||||
}
|
||||
|
||||
stream.once(kConstruct, cb);
|
||||
|
||||
if (stream.listenerCount(kConstruct) > 1) {
|
||||
// Duplex
|
||||
return;
|
||||
}
|
||||
|
||||
const r = stream._readableState;
|
||||
const w = stream._writableState;
|
||||
|
||||
if (r) {
|
||||
r.constructed = false;
|
||||
}
|
||||
if (w) {
|
||||
w.constructed = false;
|
||||
}
|
||||
|
||||
process.nextTick(constructNT, stream);
|
||||
}
|
||||
|
||||
function constructNT(stream) {
|
||||
const r = stream._readableState;
|
||||
const w = stream._writableState;
|
||||
// With duplex streams we use the writable side for state.
|
||||
const s = w || r;
|
||||
|
||||
let called = false;
|
||||
stream._construct((err) => {
|
||||
if (r) {
|
||||
r.constructed = true;
|
||||
}
|
||||
if (w) {
|
||||
w.constructed = true;
|
||||
}
|
||||
|
||||
if (called) {
|
||||
err = new ERR_MULTIPLE_CALLBACK();
|
||||
} else {
|
||||
called = true;
|
||||
}
|
||||
|
||||
if (s.destroyed) {
|
||||
stream.emit(kDestroy, err);
|
||||
} else if (err) {
|
||||
errorOrDestroy(stream, err, true);
|
||||
} else {
|
||||
process.nextTick(emitConstructNT, stream);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function emitConstructNT(stream) {
|
||||
stream.emit(kConstruct);
|
||||
}
|
||||
|
||||
function isRequest(stream) {
|
||||
return stream && stream.setHeader && typeof stream.abort === 'function';
|
||||
}
|
||||
@ -177,6 +262,7 @@ function destroyer(stream, err) {
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
construct,
|
||||
destroyer,
|
||||
destroy,
|
||||
undestroy,
|
||||
|
244
test/parallel/test-stream-construct.js
Normal file
244
test/parallel/test-stream-construct.js
Normal file
@ -0,0 +1,244 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { Writable, Readable, Duplex } = require('stream');
|
||||
const assert = require('assert');
|
||||
|
||||
{
|
||||
// Multiple callback.
|
||||
new Writable({
|
||||
construct: common.mustCall((callback) => {
|
||||
callback();
|
||||
callback();
|
||||
})
|
||||
}).on('error', common.expectsError({
|
||||
name: 'Error',
|
||||
code: 'ERR_MULTIPLE_CALLBACK'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Multiple callback.
|
||||
new Readable({
|
||||
construct: common.mustCall((callback) => {
|
||||
callback();
|
||||
callback();
|
||||
})
|
||||
}).on('error', common.expectsError({
|
||||
name: 'Error',
|
||||
code: 'ERR_MULTIPLE_CALLBACK'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Synchronous error.
|
||||
|
||||
new Writable({
|
||||
construct: common.mustCall((callback) => {
|
||||
callback(new Error('test'));
|
||||
})
|
||||
}).on('error', common.expectsError({
|
||||
name: 'Error',
|
||||
message: 'test'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Synchronous error.
|
||||
|
||||
new Readable({
|
||||
construct: common.mustCall((callback) => {
|
||||
callback(new Error('test'));
|
||||
})
|
||||
}).on('error', common.expectsError({
|
||||
name: 'Error',
|
||||
message: 'test'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Asynchronous error.
|
||||
|
||||
new Writable({
|
||||
construct: common.mustCall((callback) => {
|
||||
process.nextTick(callback, new Error('test'));
|
||||
})
|
||||
}).on('error', common.expectsError({
|
||||
name: 'Error',
|
||||
message: 'test'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Asynchronous error.
|
||||
|
||||
new Readable({
|
||||
construct: common.mustCall((callback) => {
|
||||
process.nextTick(callback, new Error('test'));
|
||||
})
|
||||
}).on('error', common.expectsError({
|
||||
name: 'Error',
|
||||
message: 'test'
|
||||
}));
|
||||
}
|
||||
|
||||
function testDestroy(factory) {
|
||||
{
|
||||
let constructed = false;
|
||||
const s = factory({
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
s.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
s.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const s = factory({
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
s.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
s.destroy(null, () => {
|
||||
assert.strictEqual(constructed, true);
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const s = factory({
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
s.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
s.destroy();
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const s = factory({
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
s.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
s.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
s.destroy(new Error('kaboom'), (err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
assert.strictEqual(constructed, true);
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const s = factory({
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
s.on('error', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
s.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
s.destroy(new Error());
|
||||
}
|
||||
}
|
||||
testDestroy((opts) => new Readable({
|
||||
read: common.mustNotCall(),
|
||||
...opts
|
||||
}));
|
||||
testDestroy((opts) => new Writable({
|
||||
write: common.mustNotCall(),
|
||||
final: common.mustNotCall(),
|
||||
...opts
|
||||
}));
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const r = new Readable({
|
||||
autoDestroy: true,
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
}),
|
||||
read: common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
r.push(null);
|
||||
})
|
||||
});
|
||||
r.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
r.on('data', common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const w = new Writable({
|
||||
autoDestroy: true,
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
}),
|
||||
write: common.mustCall((chunk, encoding, cb) => {
|
||||
assert.strictEqual(constructed, true);
|
||||
process.nextTick(cb);
|
||||
}),
|
||||
final: common.mustCall((cb) => {
|
||||
assert.strictEqual(constructed, true);
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
w.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
w.end('data');
|
||||
}
|
||||
|
||||
{
|
||||
let constructed = false;
|
||||
const w = new Writable({
|
||||
autoDestroy: true,
|
||||
construct: common.mustCall((cb) => {
|
||||
constructed = true;
|
||||
process.nextTick(cb);
|
||||
}),
|
||||
write: common.mustNotCall(),
|
||||
final: common.mustCall((cb) => {
|
||||
assert.strictEqual(constructed, true);
|
||||
process.nextTick(cb);
|
||||
})
|
||||
});
|
||||
w.on('close', common.mustCall(() => {
|
||||
assert.strictEqual(constructed, true);
|
||||
}));
|
||||
w.end();
|
||||
}
|
||||
|
||||
{
|
||||
new Duplex({
|
||||
construct: common.mustCall()
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue
Block a user