node/lib/internal/streams/from.js
Yash Ladha 364ac788e4 stream: remove redundant context from comments
Using the variable name in the comment and justifying the type seems
redundant to me and instead it should defined the entity which it is
acting, like in our case it is acting as a flag to control the flow in
streams.

PR-URL: https://github.com/nodejs/node/pull/35728
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anto Aravinth <anto.aravinth.cse@gmail.com>
2020-10-25 17:41:21 +05:30

102 lines
2.2 KiB
JavaScript

'use strict';
const {
SymbolAsyncIterator,
SymbolIterator
} = primordials;
const { Buffer } = require('buffer');
const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_NULL_VALUES
} = require('internal/errors').codes;
function from(Readable, iterable, opts) {
let iterator;
if (typeof iterable === 'string' || iterable instanceof Buffer) {
return new Readable({
objectMode: true,
...opts,
read() {
this.push(iterable);
this.push(null);
}
});
}
if (iterable && iterable[SymbolAsyncIterator])
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
iterator = iterable[SymbolIterator]();
else
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
const readable = new Readable({
objectMode: true,
highWaterMark: 1,
// TODO(ronag): What options should be allowed?
...opts
});
// Flag to protect against _read
// being called before last iteration completion.
let reading = false;
// Flag for when iterator needs to be explicitly closed.
let needToClose = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
close().then(
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
} else {
cb(error);
}
};
async function close() {
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
}
}
async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
needToClose = !done;
if (done) {
readable.push(null);
} else if (readable.destroyed) {
await close();
} else {
const res = await value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
next();
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
}
return readable;
}
module.exports = from;