node/lib/internal/webstreams/transformstream.js
Antoine du Hamel d42628d05a
stream: add trailing commas in webstream source files
PR-URL: https://github.com/nodejs/node/pull/46685
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
Reviewed-By: Darshan Sen <raisinten@gmail.com>
2023-02-18 18:55:52 +01:00

607 lines
15 KiB
JavaScript

'use strict';
const {
FunctionPrototypeBind,
FunctionPrototypeCall,
ObjectDefineProperties,
PromisePrototypeThen,
PromiseResolve,
ReflectConstruct,
SymbolToStringTag,
} = primordials;
const {
codes: {
ERR_ILLEGAL_CONSTRUCTOR,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_INVALID_THIS,
},
} = require('internal/errors');
const {
DOMException,
} = internalBinding('messaging');
const {
createDeferredPromise,
customInspectSymbol: kInspect,
kEmptyObject,
kEnumerableProperty,
} = require('internal/util');
const {
kDeserialize,
kTransfer,
kTransferList,
makeTransferable,
} = require('internal/worker/js_transferable');
const {
customInspect,
ensureIsPromise,
extractHighWaterMark,
extractSizeAlgorithm,
isBrandCheck,
nonOpFlush,
kType,
kState,
} = require('internal/webstreams/util');
const {
ReadableStream,
readableStreamDefaultControllerCanCloseOrEnqueue,
readableStreamDefaultControllerClose,
readableStreamDefaultControllerEnqueue,
readableStreamDefaultControllerError,
readableStreamDefaultControllerGetDesiredSize,
readableStreamDefaultControllerHasBackpressure,
} = require('internal/webstreams/readablestream');
const {
WritableStream,
writableStreamDefaultControllerErrorIfNeeded,
} = require('internal/webstreams/writablestream');
const assert = require('internal/assert');
const getNonWritablePropertyDescriptor = (value) => {
return {
__proto__: null,
configurable: true,
value,
};
};
/**
* @typedef {import('./queuingstrategies').QueuingStrategy
* } QueuingStrategy
* @typedef {import('./queuingstrategies').QueuingStrategySize
* } QueuingStrategySize
*/
/**
* @callback TransformerStartCallback
* @param {TransformStreamDefaultController} controller;
*/
/**
* @callback TransformerFlushCallback
* @param {TransformStreamDefaultController} controller;
* @returns {Promise<void>}
*/
/**
* @callback TransformerTransformCallback
* @param {any} chunk
* @param {TransformStreamDefaultController} controller
* @returns {Promise<void>}
*/
/**
* @typedef {{
* start? : TransformerStartCallback,
* transform? : TransformerTransformCallback,
* flush? : TransformerFlushCallback,
* readableType? : any,
* writableType? : any,
* }} Transformer
*/
class TransformStream {
[kType] = 'TransformStream';
/**
* @param {Transformer} [transformer]
* @param {QueuingStrategy} [writableStrategy]
* @param {QueuingStrategy} [readableStrategy]
*/
constructor(
transformer = null,
writableStrategy = kEmptyObject,
readableStrategy = kEmptyObject) {
const readableType = transformer?.readableType;
const writableType = transformer?.writableType;
const start = transformer?.start;
if (readableType !== undefined) {
throw new ERR_INVALID_ARG_VALUE.RangeError(
'transformer.readableType',
readableType);
}
if (writableType !== undefined) {
throw new ERR_INVALID_ARG_VALUE.RangeError(
'transformer.writableType',
writableType);
}
const readableHighWaterMark = readableStrategy?.highWaterMark;
const readableSize = readableStrategy?.size;
const writableHighWaterMark = writableStrategy?.highWaterMark;
const writableSize = writableStrategy?.size;
const actualReadableHighWaterMark =
extractHighWaterMark(readableHighWaterMark, 0);
const actualReadableSize = extractSizeAlgorithm(readableSize);
const actualWritableHighWaterMark =
extractHighWaterMark(writableHighWaterMark, 1);
const actualWritableSize = extractSizeAlgorithm(writableSize);
const startPromise = createDeferredPromise();
initializeTransformStream(
this,
startPromise,
actualWritableHighWaterMark,
actualWritableSize,
actualReadableHighWaterMark,
actualReadableSize);
setupTransformStreamDefaultControllerFromTransformer(this, transformer);
if (start !== undefined) {
startPromise.resolve(
FunctionPrototypeCall(
start,
transformer,
this[kState].controller));
} else {
startPromise.resolve();
}
// eslint-disable-next-line no-constructor-return
return makeTransferable(this);
}
/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isTransformStream(this))
throw new ERR_INVALID_THIS('TransformStream');
return this[kState].readable;
}
/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isTransformStream(this))
throw new ERR_INVALID_THIS('TransformStream');
return this[kState].writable;
}
[kInspect](depth, options) {
return customInspect(depth, options, this[kType], {
readable: this.readable,
writable: this.writable,
backpressure: this[kState].backpressure,
});
}
[kTransfer]() {
if (!isTransformStream(this))
throw new ERR_INVALID_THIS('TransformStream');
const {
readable,
writable,
} = this[kState];
if (readable.locked) {
throw new DOMException(
'Cannot transfer a locked ReadableStream',
'DataCloneError');
}
if (writable.locked) {
throw new DOMException(
'Cannot transfer a locked WritableStream',
'DataCloneError');
}
return {
data: {
readable,
writable,
},
deserializeInfo:
'internal/webstreams/transformstream:TransferredTransformStream',
};
}
[kTransferList]() {
return [ this[kState].readable, this[kState].writable ];
}
[kDeserialize]({ readable, writable }) {
this[kState].readable = readable;
this[kState].writable = writable;
}
}
ObjectDefineProperties(TransformStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
});
function TransferredTransformStream() {
return makeTransferable(ReflectConstruct(
function() {
this[kType] = 'TransformStream';
this[kState] = {
readable: undefined,
writable: undefined,
backpressure: undefined,
backpressureChange: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
controller: undefined,
};
},
[], TransformStream));
}
TransferredTransformStream.prototype[kDeserialize] = () => {};
class TransformStreamDefaultController {
[kType] = 'TransformStreamDefaultController';
constructor() {
throw new ERR_ILLEGAL_CONSTRUCTOR();
}
/**
* @readonly
* @type {number}
*/
get desiredSize() {
if (!isTransformStreamDefaultController(this))
throw new ERR_INVALID_THIS('TransformStreamDefaultController');
const {
stream,
} = this[kState];
const {
readable,
} = stream[kState];
const {
controller: readableController,
} = readable[kState];
return readableStreamDefaultControllerGetDesiredSize(readableController);
}
/**
* @param {any} [chunk]
*/
enqueue(chunk = undefined) {
if (!isTransformStreamDefaultController(this))
throw new ERR_INVALID_THIS('TransformStreamDefaultController');
transformStreamDefaultControllerEnqueue(this, chunk);
}
/**
* @param {any} [reason]
*/
error(reason = undefined) {
if (!isTransformStreamDefaultController(this))
throw new ERR_INVALID_THIS('TransformStreamDefaultController');
transformStreamDefaultControllerError(this, reason);
}
terminate() {
if (!isTransformStreamDefaultController(this))
throw new ERR_INVALID_THIS('TransformStreamDefaultController');
transformStreamDefaultControllerTerminate(this);
}
[kInspect](depth, options) {
return customInspect(depth, options, this[kType], {
stream: this[kState].stream,
});
}
}
ObjectDefineProperties(TransformStreamDefaultController.prototype, {
desiredSize: kEnumerableProperty,
enqueue: kEnumerableProperty,
error: kEnumerableProperty,
terminate: kEnumerableProperty,
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStreamDefaultController.name),
});
function createTransformStreamDefaultController() {
return ReflectConstruct(
function() {
this[kType] = 'TransformStreamDefaultController';
},
[],
TransformStreamDefaultController);
}
const isTransformStream =
isBrandCheck('TransformStream');
const isTransformStreamDefaultController =
isBrandCheck('TransformStreamDefaultController');
async function defaultTransformAlgorithm(chunk, controller) {
transformStreamDefaultControllerEnqueue(controller, chunk);
}
function initializeTransformStream(
stream,
startPromise,
writableHighWaterMark,
writableSizeAlgorithm,
readableHighWaterMark,
readableSizeAlgorithm) {
const writable = new WritableStream({
__proto__: null,
start() { return startPromise.promise; },
write(chunk) {
return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
},
abort(reason) {
return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
},
close() {
return transformStreamDefaultSinkCloseAlgorithm(stream);
},
}, {
highWaterMark: writableHighWaterMark,
size: writableSizeAlgorithm,
});
const readable = new ReadableStream({
__proto__: null,
start() { return startPromise.promise; },
pull() {
return transformStreamDefaultSourcePullAlgorithm(stream);
},
cancel(reason) {
transformStreamErrorWritableAndUnblockWrite(stream, reason);
return PromiseResolve();
},
}, {
highWaterMark: readableHighWaterMark,
size: readableSizeAlgorithm,
});
stream[kState] = {
readable,
writable,
controller: undefined,
backpressure: undefined,
backpressureChange: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
};
transformStreamSetBackpressure(stream, true);
}
function transformStreamError(stream, error) {
const {
readable,
} = stream[kState];
const {
controller,
} = readable[kState];
readableStreamDefaultControllerError(controller, error);
transformStreamErrorWritableAndUnblockWrite(stream, error);
}
function transformStreamErrorWritableAndUnblockWrite(stream, error) {
const {
controller,
writable,
} = stream[kState];
transformStreamDefaultControllerClearAlgorithms(controller);
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
error);
if (stream[kState].backpressure)
transformStreamSetBackpressure(stream, false);
}
function transformStreamSetBackpressure(stream, backpressure) {
assert(stream[kState].backpressure !== backpressure);
if (stream[kState].backpressureChange.promise !== undefined)
stream[kState].backpressureChange.resolve?.();
stream[kState].backpressureChange = createDeferredPromise();
stream[kState].backpressure = backpressure;
}
function setupTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm) {
assert(isTransformStream(stream));
assert(stream[kState].controller === undefined);
controller[kState] = {
stream,
transformAlgorithm,
flushAlgorithm,
};
stream[kState].controller = controller;
}
function setupTransformStreamDefaultControllerFromTransformer(
stream,
transformer) {
const controller = createTransformStreamDefaultController();
const transform = transformer?.transform || defaultTransformAlgorithm;
const flush = transformer?.flush || nonOpFlush;
const transformAlgorithm =
FunctionPrototypeBind(transform, transformer);
const flushAlgorithm =
FunctionPrototypeBind(flush, transformer);
setupTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm);
}
function transformStreamDefaultControllerClearAlgorithms(controller) {
controller[kState].transformAlgorithm = undefined;
controller[kState].flushAlgorithm = undefined;
}
function transformStreamDefaultControllerEnqueue(controller, chunk) {
const {
stream,
} = controller[kState];
const {
readable,
} = stream[kState];
const {
controller: readableController,
} = readable[kState];
if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
throw new ERR_INVALID_STATE.TypeError('Unable to enqueue');
try {
readableStreamDefaultControllerEnqueue(readableController, chunk);
} catch (error) {
transformStreamErrorWritableAndUnblockWrite(stream, error);
throw readable[kState].storedError;
}
const backpressure =
readableStreamDefaultControllerHasBackpressure(readableController);
if (backpressure !== stream[kState].backpressure) {
assert(backpressure);
transformStreamSetBackpressure(stream, true);
}
}
function transformStreamDefaultControllerError(controller, error) {
transformStreamError(controller[kState].stream, error);
}
async function transformStreamDefaultControllerPerformTransform(controller, chunk) {
try {
return await ensureIsPromise(
controller[kState].transformAlgorithm,
controller,
chunk,
controller);
} catch (error) {
transformStreamError(controller[kState].stream, error);
throw error;
}
}
function transformStreamDefaultControllerTerminate(controller) {
const {
stream,
} = controller[kState];
const {
readable,
} = stream[kState];
assert(readable !== undefined);
const {
controller: readableController,
} = readable[kState];
readableStreamDefaultControllerClose(readableController);
transformStreamErrorWritableAndUnblockWrite(
stream,
new ERR_INVALID_STATE.TypeError('TransformStream has been terminated'));
}
function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
const {
writable,
controller,
} = stream[kState];
assert(writable[kState].state === 'writable');
if (stream[kState].backpressure) {
const backpressureChange = stream[kState].backpressureChange.promise;
return PromisePrototypeThen(
backpressureChange,
() => {
const {
writable,
} = stream[kState];
if (writable[kState].state === 'erroring')
throw writable[kState].storedError;
assert(writable[kState].state === 'writable');
return transformStreamDefaultControllerPerformTransform(
controller,
chunk);
});
}
return transformStreamDefaultControllerPerformTransform(controller, chunk);
}
async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
transformStreamError(stream, reason);
}
function transformStreamDefaultSinkCloseAlgorithm(stream) {
const {
readable,
controller,
} = stream[kState];
const flushPromise =
ensureIsPromise(
controller[kState].flushAlgorithm,
controller,
controller);
transformStreamDefaultControllerClearAlgorithms(controller);
return PromisePrototypeThen(
flushPromise,
() => {
if (readable[kState].state === 'errored')
throw readable[kState].storedError;
readableStreamDefaultControllerClose(readable[kState].controller);
},
(error) => {
transformStreamError(stream, error);
throw readable[kState].storedError;
});
}
function transformStreamDefaultSourcePullAlgorithm(stream) {
assert(stream[kState].backpressure);
assert(stream[kState].backpressureChange.promise !== undefined);
transformStreamSetBackpressure(stream, false);
return stream[kState].backpressureChange.promise;
}
module.exports = {
TransformStream,
TransformStreamDefaultController,
TransferredTransformStream,
// Exported Brand Checks
isTransformStream,
isTransformStreamDefaultController,
};