node/lib/internal/streams/async_iterator.js
Michaël Zasso 0646eda4fc
lib: flatten access to primordials
Store all primordials as properties of the primordials object.
Static functions are prefixed by the constructor's name and prototype
methods are prefixed by the constructor's name followed by "Prototype".
For example: primordials.Object.keys becomes primordials.ObjectKeys.

PR-URL: https://github.com/nodejs/node/pull/30610
Refs: https://github.com/nodejs/node/issues/29766
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
2019-11-25 10:28:15 +01:00

202 lines
5.6 KiB
JavaScript

'use strict';
const {
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
} = primordials;
const finished = require('internal/streams/end-of-stream');
const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');
function createIterResult(value, done) {
return { value, done };
}
function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kStream].read();
// We defer if data is null. We can be expecting either 'end' or 'error'.
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(data, false));
}
}
}
function onReadable(iter) {
// We wait for the next tick, because it might
// emit an error with `process.nextTick()`.
process.nextTick(readAndResolve, iter);
}
function wrapForNext(lastPromise, iter) {
return (resolve, reject) => {
lastPromise.then(() => {
if (iter[kEnded]) {
resolve(createIterResult(undefined, true));
return;
}
iter[kHandlePromise](resolve, reject);
}, reject);
};
}
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);
const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
get stream() {
return this[kStream];
},
next() {
// If we have detected an error in the meanwhile
// reject straight away.
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
if (this[kEnded]) {
return Promise.resolve(createIterResult(undefined, true));
}
if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise((resolve, reject) => {
process.nextTick(() => {
if (this[kError]) {
reject(this[kError]);
} else {
resolve(createIterResult(undefined, true));
}
});
});
}
// If we have multiple next() calls we will wait for the previous Promise to
// finish. This logic is optimized to support for await loops, where next()
// is only called once at a time.
const lastPromise = this[kLastPromise];
let promise;
if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// Fast path needed to support multiple this.push()
// without triggering the next() queue.
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(createIterResult(data, false));
}
promise = new Promise(this[kHandlePromise]);
}
this[kLastPromise] = promise;
return promise;
},
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];
// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}
finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
stream.destroy();
});
},
}, AsyncIteratorPrototype);
const createReadableStreamAsyncIterator = (stream) => {
const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream.readableEnded || stream._readableState.endEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
// closure at every run.
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});
iterator[kLastPromise] = null;
finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
// Reject if we are waiting for data in the Promise returned by next() and
// store the error.
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}
const resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(undefined, true));
}
iterator[kEnded] = true;
});
stream.on('readable', onReadable.bind(null, iterator));
return iterator;
};
module.exports = createReadableStreamAsyncIterator;