node/test/parallel/test-stream-pipeline.js
Robert Nagy b2be348fcc stream: ensure pipeline always destroys streams
There was an edge case where an incorrect assumption was made
in regardos whether eos/finished means that the stream is
actually destroyed or not.

PR-URL: https://github.com/nodejs/node/pull/31940
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
2020-02-26 23:48:14 +01:00

925 lines
18 KiB
JavaScript

'use strict';
const common = require('../common');
const {
Stream,
Writable,
Readable,
Transform,
pipeline,
PassThrough
} = require('stream');
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c')
];
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
processed.push(data);
cb();
}
});
write.on('finish', () => {
finished = true;
});
for (let i = 0; i < expected.length; i++) {
read.push(expected[i]);
}
read.push(null);
pipeline(read, write, common.mustCall((err) => {
assert.ok(!err, 'no error');
assert.ok(finished);
assert.deepStrictEqual(processed, expected);
}));
}
{
const read = new Readable({
read() {}
});
assert.throws(() => {
pipeline(read, () => {});
}, /ERR_MISSING_ARGS/);
assert.throws(() => {
pipeline(() => {});
}, /ERR_MISSING_ARGS/);
assert.throws(() => {
pipeline();
}, /ERR_INVALID_CALLBACK/);
}
{
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
setImmediate(() => read.destroy());
pipeline(read, write, common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}
{
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
setImmediate(() => read.destroy(new Error('kaboom')));
const dst = pipeline(read, write, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
assert.strictEqual(dst, write);
}
{
const read = new Readable({
read() {}
});
const transform = new Transform({
transform(data, enc, cb) {
cb(new Error('kaboom'));
}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.on('close', common.mustCall());
transform.on('close', common.mustCall());
write.on('close', common.mustCall());
[read, transform, write].forEach((stream) => {
stream.on('error', common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
});
const dst = pipeline(read, transform, write, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
assert.strictEqual(dst, write);
read.push('hello');
}
{
const server = http.createServer((req, res) => {
const rs = new Readable({
read() {
rs.push('hello');
rs.push(null);
}
});
pipeline(rs, res, () => {});
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.end();
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}
{
const server = http.createServer((req, res) => {
let sent = false;
const rs = new Readable({
read() {
if (sent) {
return;
}
sent = true;
rs.push('hello');
},
destroy: common.mustCall((err, cb) => {
// Prevents fd leaks by destroying http pipelines
cb();
})
});
pipeline(rs, res, () => {});
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.end();
req.on('response', (res) => {
setImmediate(() => {
res.destroy();
server.close();
});
});
});
}
{
const server = http.createServer((req, res) => {
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
},
destroy: common.mustCall((err, cb) => {
cb();
})
});
pipeline(rs, res, () => {});
});
let cnt = 10;
const badSink = new Writable({
write(data, enc, cb) {
cnt--;
if (cnt === 0) cb(new Error('kaboom'));
else cb();
}
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.end();
req.on('response', (res) => {
pipeline(res, badSink, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
server.close();
}));
});
});
}
{
const server = http.createServer((req, res) => {
pipeline(req, res, common.mustCall());
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
}
});
pipeline(rs, req, common.mustCall(() => {
server.close();
}));
req.on('response', (res) => {
let cnt = 10;
res.on('data', () => {
cnt--;
if (cnt === 0) rs.destroy();
});
});
});
}
{
const makeTransform = () => {
const tr = new Transform({
transform(data, enc, cb) {
cb(null, data);
}
});
tr.on('close', common.mustCall());
return tr;
};
const rs = new Readable({
read() {
rs.push('hello');
}
});
let cnt = 10;
const ws = new Writable({
write(data, enc, cb) {
cnt--;
if (cnt === 0) return cb(new Error('kaboom'));
cb();
}
});
rs.on('close', common.mustCall());
ws.on('close', common.mustCall());
pipeline(
rs,
makeTransform(),
makeTransform(),
makeTransform(),
makeTransform(),
makeTransform(),
makeTransform(),
ws,
common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
})
);
}
{
const oldStream = new Stream();
oldStream.pause = oldStream.resume = () => {};
oldStream.write = (data) => {
oldStream.emit('data', data);
return true;
};
oldStream.end = () => {
oldStream.emit('end');
};
const expected = [
Buffer.from('hello'),
Buffer.from('world')
];
const rs = new Readable({
read() {
for (let i = 0; i < expected.length; i++) {
rs.push(expected[i]);
}
rs.push(null);
}
});
const ws = new Writable({
write(data, enc, cb) {
assert.deepStrictEqual(data, expected.shift());
cb();
}
});
let finished = false;
ws.on('finish', () => {
finished = true;
});
pipeline(
rs,
oldStream,
ws,
common.mustCall((err) => {
assert(!err, 'no error');
assert(finished, 'last stream finished');
})
);
}
{
const oldStream = new Stream();
oldStream.pause = oldStream.resume = () => {};
oldStream.write = (data) => {
oldStream.emit('data', data);
return true;
};
oldStream.end = () => {
oldStream.emit('end');
};
const destroyableOldStream = new Stream();
destroyableOldStream.pause = destroyableOldStream.resume = () => {};
destroyableOldStream.destroy = common.mustCall(() => {
destroyableOldStream.emit('close');
});
destroyableOldStream.write = (data) => {
destroyableOldStream.emit('data', data);
return true;
};
destroyableOldStream.end = () => {
destroyableOldStream.emit('end');
};
const rs = new Readable({
read() {
rs.destroy(new Error('stop'));
}
});
const ws = new Writable({
write(data, enc, cb) {
cb();
}
});
let finished = false;
ws.on('finish', () => {
finished = true;
});
pipeline(
rs,
oldStream,
destroyableOldStream,
ws,
common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('stop'));
assert(!finished, 'should not finish');
})
);
}
{
const pipelinePromise = promisify(pipeline);
async function run() {
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
read.push(null);
let finished = false;
write.on('finish', () => {
finished = true;
});
await pipelinePromise(read, write);
assert(finished);
}
run();
}
{
const read = new Readable({
read() {}
});
const transform = new Transform({
transform(data, enc, cb) {
cb(new Error('kaboom'));
}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
assert.throws(
() => pipeline(read, transform, write),
{ code: 'ERR_INVALID_CALLBACK' }
);
}
{
const server = http.Server(function(req, res) {
res.write('asd');
});
server.listen(0, function() {
http.get({ port: this.address().port }, (res) => {
const stream = new PassThrough();
stream.on('error', common.mustCall());
pipeline(
res,
stream,
common.mustCall((err) => {
assert.ok(err);
// TODO(ronag):
// assert.strictEqual(err.message, 'oh no');
server.close();
})
);
stream.destroy(new Error('oh no'));
}).on('error', common.mustNotCall());
});
}
{
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline(function*() {
yield 'hello';
yield 'world';
}(), w, common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}
{
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}(), w, common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}
{
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline(function*() {
yield 'hello';
yield 'world';
}, w, common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}
{
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, w, common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}
{
let res = '';
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, async function*(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
}
}, async function(source) {
for await (const chunk of source) {
res += chunk;
}
}, common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'HELLOWORLD');
}));
}
{
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, async function*(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
}
}, async function(source) {
let ret = '';
for await (const chunk of source) {
ret += chunk;
}
return ret;
}, common.mustCall((err, val) => {
assert.ok(!err);
assert.strictEqual(val, 'HELLOWORLD');
}));
}
{
// AsyncIterable destination is returned and finalizes.
const ret = pipeline(async function*() {
await Promise.resolve();
yield 'hello';
}, async function*(source) {
for await (const chunk of source) {
chunk;
}
}, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
ret.resume();
assert.strictEqual(typeof ret.pipe, 'function');
}
{
// AsyncFunction destination is not returned and error is
// propagated.
const ret = pipeline(async function*() {
await Promise.resolve();
throw new Error('kaboom');
}, async function*(source) {
for await (const chunk of source) {
chunk;
}
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
ret.resume();
assert.strictEqual(typeof ret.pipe, 'function');
}
{
const s = new PassThrough();
pipeline(async function*() {
throw new Error('kaboom');
}, s, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
}
{
const s = new PassThrough();
pipeline(async function*() {
throw new Error('kaboom');
}(), s, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
}
{
const s = new PassThrough();
pipeline(function*() {
throw new Error('kaboom');
}, s, common.mustCall((err, val) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
}
{
const s = new PassThrough();
pipeline(function*() {
throw new Error('kaboom');
}(), s, common.mustCall((err, val) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
}
{
const s = new PassThrough();
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, s, async function(source) {
for await (const chunk of source) {
chunk;
throw new Error('kaboom');
}
}, common.mustCall((err, val) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
}
{
const s = new PassThrough();
const ret = pipeline(function() {
return ['hello', 'world'];
}, s, async function*(source) {
for await (const chunk of source) {
chunk;
throw new Error('kaboom');
}
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
ret.resume();
assert.strictEqual(typeof ret.pipe, 'function');
}
{
// Legacy streams without async iterator.
const s = new PassThrough();
s.push('asd');
s.push(null);
s[Symbol.asyncIterator] = null;
let ret = '';
pipeline(s, async function(source) {
for await (const chunk of source) {
ret += chunk;
}
}, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(ret, 'asd');
}));
}
{
// v1 streams without read().
const s = new Stream();
process.nextTick(() => {
s.emit('data', 'asd');
s.emit('end');
});
// 'destroyer' can be called multiple times,
// once from stream wrapper and
// once from iterator wrapper.
s.close = common.mustCallAtLeast(1);
let ret = '';
pipeline(s, async function(source) {
for await (const chunk of source) {
ret += chunk;
}
}, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(ret, 'asd');
}));
}
{
// v1 error streams without read().
const s = new Stream();
process.nextTick(() => {
s.emit('error', new Error('kaboom'));
});
s.destroy = common.mustCall();
pipeline(s, async function(source) {
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
}
{
const s = new PassThrough();
assert.throws(() => {
pipeline(function(source) {
}, s, () => {});
}, (err) => {
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
assert.strictEqual(s.destroyed, false);
return true;
});
}
{
const s = new PassThrough();
assert.throws(() => {
pipeline(s, function(source) {
}, s, () => {});
}, (err) => {
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
assert.strictEqual(s.destroyed, false);
return true;
});
}
{
const s = new PassThrough();
assert.throws(() => {
pipeline(s, function(source) {
}, () => {});
}, (err) => {
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
assert.strictEqual(s.destroyed, false);
return true;
});
}
{
const s = new PassThrough();
assert.throws(() => {
pipeline(s, function*(source) {
}, () => {});
}, (err) => {
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
assert.strictEqual(s.destroyed, false);
return true;
});
}
{
let res = '';
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, new Transform({
transform(chunk, encoding, cb) {
cb(new Error('kaboom'));
}
}), async function(source) {
for await (const chunk of source) {
res += chunk;
}
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(res, '');
}));
}
{
let res = '';
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, new Transform({
transform(chunk, encoding, cb) {
process.nextTick(cb, new Error('kaboom'));
}
}), async function(source) {
for await (const chunk of source) {
res += chunk;
}
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(res, '');
}));
}
{
let res = '';
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, new Transform({
decodeStrings: false,
transform(chunk, encoding, cb) {
cb(null, chunk.toUpperCase());
}
}), async function(source) {
for await (const chunk of source) {
res += chunk;
}
}, common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'HELLOWORLD');
}));
}
{
// Ensure no unhandled rejection from async function.
pipeline(async function*() {
yield 'hello';
}, async function(source) {
throw new Error('kaboom');
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
}
{
const src = new PassThrough({ autoDestroy: false });
const dst = new PassThrough({ autoDestroy: false });
pipeline(src, dst, common.mustCall(() => {
assert.strictEqual(src.destroyed, true);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
}