node/lib/internal/streams/from.js
Robert Nagy 8607f9ec5c stream: make from read one at a time
Currently from will eagerly buffer up items
which means that errors are also eagerly
encountered and items which are buffer when
an error occurs will be discarded, which is
inconsistent with how generators work.

Fixes: https://github.com/nodejs/node/issues/29428

PR-URL: https://github.com/nodejs/node/pull/33201
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
2020-05-06 22:39:33 +02:00

102 lines
2.2 KiB
JavaScript

'use strict';
const {
SymbolAsyncIterator,
SymbolIterator
} = primordials;
const { Buffer } = require('buffer');
const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_NULL_VALUES
} = require('internal/errors').codes;
function from(Readable, iterable, opts) {
let iterator;
if (typeof iterable === 'string' || iterable instanceof Buffer) {
return new Readable({
objectMode: true,
...opts,
read() {
this.push(iterable);
this.push(null);
}
});
}
if (iterable && iterable[SymbolAsyncIterator])
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
iterator = iterable[SymbolIterator]();
else
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
const readable = new Readable({
objectMode: true,
highWaterMark: 1,
// TODO(ronag): What options should be allowed?
...opts
});
// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;
// needToClose boolean if iterator needs to be explicitly closed
let needToClose = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
close().then(
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
} else {
cb(error);
}
};
async function close() {
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
}
}
async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
needToClose = !done;
if (done) {
readable.push(null);
} else if (readable.destroyed) {
await close();
} else {
const res = await value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
next();
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
}
return readable;
}
module.exports = from;