'use strict'; const { pipeline } = require('internal/streams/pipeline'); const Duplex = require('internal/streams/duplex'); const { destroyer } = require('internal/streams/destroy'); const { isNodeStream, isReadable, isWritable, } = require('internal/streams/utils'); const { AbortError, codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS, }, } = require('internal/errors'); module.exports = function compose(...streams) { if (streams.length === 0) { throw new ERR_MISSING_ARGS('streams'); } if (streams.length === 1) { return Duplex.from(streams[0]); } const orgStreams = [...streams]; if (typeof streams[0] === 'function') { streams[0] = Duplex.from(streams[0]); } if (typeof streams[streams.length - 1] === 'function') { const idx = streams.length - 1; streams[idx] = Duplex.from(streams[idx]); } for (let n = 0; n < streams.length; ++n) { if (!isNodeStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue; } if (n < streams.length - 1 && !isReadable(streams[n])) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], 'must be readable' ); } if (n > 0 && !isWritable(streams[n])) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], 'must be writable' ); } } let ondrain; let onfinish; let onreadable; let onclose; let d; function onfinished(err) { const cb = onclose; onclose = null; if (cb) { cb(err); } else if (err) { d.destroy(err); } else if (!readable && !writable) { d.destroy(); } } const head = streams[0]; const tail = pipeline(streams, onfinished); const writable = !!isWritable(head); const readable = !!isReadable(tail); // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. // See, https://github.com/nodejs/node/pull/33515. d = new Duplex({ // TODO (ronag): highWaterMark? writableObjectMode: !!head?.writableObjectMode, readableObjectMode: !!tail?.writableObjectMode, writable, readable, }); if (writable) { d._write = function(chunk, encoding, callback) { if (head.write(chunk, encoding)) { callback(); } else { ondrain = callback; } }; d._final = function(callback) { head.end(); onfinish = callback; }; head.on('drain', function() { if (ondrain) { const cb = ondrain; ondrain = null; cb(); } }); tail.on('finish', function() { if (onfinish) { const cb = onfinish; onfinish = null; cb(); } }); } if (readable) { tail.on('readable', function() { if (onreadable) { const cb = onreadable; onreadable = null; cb(); } }); tail.on('end', function() { d.push(null); }); d._read = function() { while (true) { const buf = tail.read(); if (buf === null) { onreadable = d._read; return; } if (!d.push(buf)) { return; } } }; } d._destroy = function(err, callback) { if (!err && onclose !== null) { err = new AbortError(); } onreadable = null; ondrain = null; onfinish = null; if (onclose === null) { callback(err); } else { onclose = callback; destroyer(tail, err); } }; return d; };