stream: construct

Provide a standardized way of asynchronously creating and
initializing resources before performing any work.

Refs: https://github.com/nodejs/node/issues/29314

PR-URL: https://github.com/nodejs/node/pull/29656
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
This commit is contained in:
Robert Nagy 2019-08-25 18:13:27 +02:00
parent 9949a2e1e3
commit fb8cc72e73
5 changed files with 544 additions and 20 deletions

View File

@ -550,8 +550,7 @@ added: v9.3.0
* {number}
Return the value of `highWaterMark` passed when constructing this
`Writable`.
Return the value of `highWaterMark` passed when creating this `Writable`.
##### `writable.writableLength`
<!-- YAML
@ -1193,8 +1192,7 @@ added: v9.3.0
* {number}
Returns the value of `highWaterMark` passed when constructing this
`Readable`.
Returns the value of `highWaterMark` passed when creating this `Readable`.
##### `readable.readableLength`
<!-- YAML
@ -1792,7 +1790,7 @@ expectations.
added: v1.2.0
-->
For many simple cases, it is possible to construct a stream without relying on
For many simple cases, it is possible to create a stream without relying on
inheritance. This can be accomplished by directly creating instances of the
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
objects and passing appropriate methods as constructor options.
@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
const { Writable } = require('stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
}
});
```
@ -1861,6 +1865,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][writable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
@ -1906,6 +1912,56 @@ const myWritable = new Writable({
});
```
#### `writable._construct(callback)`
<!-- YAML
added: REPLACEME
-->
* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.
The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.
This optional function will be called in a tick after the stream constructor
has returned, delaying any `_write`, `_final` and `_destroy` calls until
`callback` is called. This is useful to initialize state or asynchronously
initialize resources before the stream can be used.
```js
const { Writable } = require('stream');
const fs = require('fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = fd;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```
#### `writable._write(chunk, encoding, callback)`
<!-- YAML
changes:
@ -2130,6 +2186,8 @@ changes:
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
@ -2172,6 +2230,63 @@ const myReadable = new Readable({
});
```
#### `readable._construct(callback)`
<!-- YAML
added: REPLACEME
-->
* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.
The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Readable`
class methods only.
This optional function will be called by the stream constructor,
delaying any `_read` and `_destroy` calls until `callback` is called. This is
useful to initialize state or asynchronously initialize resources before the
stream can be used.
```js
const { Readable } = require('stream');
const fs = require('fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```
#### `readable._read(size)`
<!-- YAML
added: v0.9.4
@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
});
```
When using pipeline:
```js
const { Transform, pipeline } = require('stream');
const fs = require('fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
}
);
```
#### An Example Duplex Stream
The following illustrates a simple example of a `Duplex` stream that wraps a
@ -2706,8 +2861,8 @@ unhandled post-destroy errors.
#### Creating Readable Streams with Async Generators
We can construct a Node.js Readable Stream from an asynchronous generator
using the `Readable.from()` utility method:
A Node.js Readable Stream can be created from an asynchronous generator using
the `Readable.from()` utility method:
```js
const { Readable } = require('stream');
@ -2960,6 +3115,7 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[object-mode]: #stream_object_mode
[readable-_construct]: #stream_readable_construct_callback
[readable-_destroy]: #stream_readable_destroy_err_callback
[readable-destroy]: #stream_readable_destroy_error
[stream-_final]: #stream_writable_final_callback
@ -2976,6 +3132,7 @@ contain multi-byte characters.
[stream-uncork]: #stream_writable_uncork
[stream-write]: #stream_writable_write_chunk_encoding_callback
[Stream Three States]: #stream_three_states
[writable-_construct]: #stream_writable_construct_callback
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[writable-new]: #stream_constructor_new_stream_writable_options

View File

@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
@ -197,9 +203,16 @@ function Readable(options) {
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
if (typeof options.construct === 'function')
this._construct = options.construct;
}
Stream.call(this, options);
destroyImpl.construct(this, () => {
maybeReadMore(this, this._readableState);
});
}
Readable.prototype.destroy = destroyImpl.destroy;
@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
}
// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored ||
!state.constructed) {
doRead = false;
debug('reading or ended', doRead);
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
@ -587,7 +601,7 @@ function emitReadable_(stream) {
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
if (!state.readingMore) {
if (!state.readingMore && state.constructed) {
state.readingMore = true;
process.nextTick(maybeReadMore_, stream, state);
}

View File

@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;
// Emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams.
this.prefinished = false;
@ -249,9 +255,22 @@ function Writable(options) {
if (typeof options.final === 'function')
this._final = options.final;
if (typeof options.construct === 'function')
this._construct = options.construct;
}
Stream.call(this, options);
destroyImpl.construct(this, () => {
const state = this._writableState;
if (!state.writing) {
clearBuffer(this, state);
}
finishMaybe(this, state);
});
}
// Otherwise people can pipe Writable streams, which is just wrong.
@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.length += len;
if (state.writing || state.corked || state.errored) {
if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
@ -492,7 +511,10 @@ function errorBuffer(state, err) {
// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked || state.bufferProcessing || state.destroyed) {
if (state.corked ||
state.bufferProcessing ||
state.destroyed ||
!state.constructed) {
return;
}
@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
function needFinish(state) {
return (state.ending &&
state.constructed &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0 &&

View File

@ -1,10 +1,20 @@
'use strict';
const {
ERR_MULTIPLE_CALLBACK
} = require('internal/errors').codes;
const { Symbol } = primordials;
const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
// Backwards compat. cb() is undocumented and unused in core but
// unfortunately might be used by modules.
function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;
// With duplex streams we use the writable side for state.
const s = w || r;
if ((w && w.destroyed) || (r && r.destroyed)) {
if (typeof cb === 'function') {
@ -33,7 +43,23 @@ function destroy(err, cb) {
r.destroyed = true;
}
this._destroy(err || null, (err) => {
// If still constructing then defer calling _destroy.
if (!s.constructed) {
this.once(kDestroy, function(er) {
_destroy(this, err || er, cb);
});
} else {
_destroy(this, err, cb);
}
return this;
}
function _destroy(self, err, cb) {
self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;
if (err) {
if (w) {
w.errored = true;
@ -55,13 +81,11 @@ function destroy(err, cb) {
}
if (err) {
process.nextTick(emitErrorCloseNT, this, err);
process.nextTick(emitErrorCloseNT, self, err);
} else {
process.nextTick(emitCloseNT, this);
process.nextTick(emitCloseNT, self);
}
});
return this;
}
function emitErrorCloseNT(self, err) {
@ -108,6 +132,7 @@ function undestroy() {
const w = this._writableState;
if (r) {
r.constructed = true;
r.closed = false;
r.closeEmitted = false;
r.destroyed = false;
@ -119,6 +144,7 @@ function undestroy() {
}
if (w) {
w.constructed = true;
w.destroyed = false;
w.closed = false;
w.closeEmitted = false;
@ -155,7 +181,6 @@ function errorOrDestroy(stream, err, sync) {
if (r) {
r.errored = true;
}
if (sync) {
process.nextTick(emitErrorNT, stream, err);
} else {
@ -164,6 +189,66 @@ function errorOrDestroy(stream, err, sync) {
}
}
function construct(stream, cb) {
if (typeof stream._construct !== 'function') {
return;
}
stream.once(kConstruct, cb);
if (stream.listenerCount(kConstruct) > 1) {
// Duplex
return;
}
const r = stream._readableState;
const w = stream._writableState;
if (r) {
r.constructed = false;
}
if (w) {
w.constructed = false;
}
process.nextTick(constructNT, stream);
}
function constructNT(stream) {
const r = stream._readableState;
const w = stream._writableState;
// With duplex streams we use the writable side for state.
const s = w || r;
let called = false;
stream._construct((err) => {
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
if (called) {
err = new ERR_MULTIPLE_CALLBACK();
} else {
called = true;
}
if (s.destroyed) {
stream.emit(kDestroy, err);
} else if (err) {
errorOrDestroy(stream, err, true);
} else {
process.nextTick(emitConstructNT, stream);
}
});
}
function emitConstructNT(stream) {
stream.emit(kConstruct);
}
function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}
@ -177,6 +262,7 @@ function destroyer(stream, err) {
}
module.exports = {
construct,
destroyer,
destroy,
undestroy,

View File

@ -0,0 +1,244 @@
'use strict';
const common = require('../common');
const { Writable, Readable, Duplex } = require('stream');
const assert = require('assert');
{
// Multiple callback.
new Writable({
construct: common.mustCall((callback) => {
callback();
callback();
})
}).on('error', common.expectsError({
name: 'Error',
code: 'ERR_MULTIPLE_CALLBACK'
}));
}
{
// Multiple callback.
new Readable({
construct: common.mustCall((callback) => {
callback();
callback();
})
}).on('error', common.expectsError({
name: 'Error',
code: 'ERR_MULTIPLE_CALLBACK'
}));
}
{
// Synchronous error.
new Writable({
construct: common.mustCall((callback) => {
callback(new Error('test'));
})
}).on('error', common.expectsError({
name: 'Error',
message: 'test'
}));
}
{
// Synchronous error.
new Readable({
construct: common.mustCall((callback) => {
callback(new Error('test'));
})
}).on('error', common.expectsError({
name: 'Error',
message: 'test'
}));
}
{
// Asynchronous error.
new Writable({
construct: common.mustCall((callback) => {
process.nextTick(callback, new Error('test'));
})
}).on('error', common.expectsError({
name: 'Error',
message: 'test'
}));
}
{
// Asynchronous error.
new Readable({
construct: common.mustCall((callback) => {
process.nextTick(callback, new Error('test'));
})
}).on('error', common.expectsError({
name: 'Error',
message: 'test'
}));
}
function testDestroy(factory) {
{
let constructed = false;
const s = factory({
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
})
});
s.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
s.destroy();
}
{
let constructed = false;
const s = factory({
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
})
});
s.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
s.destroy(null, () => {
assert.strictEqual(constructed, true);
});
}
{
let constructed = false;
const s = factory({
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
})
});
s.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
s.destroy();
}
{
let constructed = false;
const s = factory({
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
})
});
s.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
s.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
s.destroy(new Error('kaboom'), (err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(constructed, true);
});
}
{
let constructed = false;
const s = factory({
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
})
});
s.on('error', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
s.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
s.destroy(new Error());
}
}
testDestroy((opts) => new Readable({
read: common.mustNotCall(),
...opts
}));
testDestroy((opts) => new Writable({
write: common.mustNotCall(),
final: common.mustNotCall(),
...opts
}));
{
let constructed = false;
const r = new Readable({
autoDestroy: true,
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
}),
read: common.mustCall(() => {
assert.strictEqual(constructed, true);
r.push(null);
})
});
r.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
r.on('data', common.mustNotCall());
}
{
let constructed = false;
const w = new Writable({
autoDestroy: true,
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
}),
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(constructed, true);
process.nextTick(cb);
}),
final: common.mustCall((cb) => {
assert.strictEqual(constructed, true);
process.nextTick(cb);
})
});
w.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
w.end('data');
}
{
let constructed = false;
const w = new Writable({
autoDestroy: true,
construct: common.mustCall((cb) => {
constructed = true;
process.nextTick(cb);
}),
write: common.mustNotCall(),
final: common.mustCall((cb) => {
assert.strictEqual(constructed, true);
process.nextTick(cb);
})
});
w.on('close', common.mustCall(() => {
assert.strictEqual(constructed, true);
}));
w.end();
}
{
new Duplex({
construct: common.mustCall()
});
}