Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 47 additions & 36 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2480,21 +2480,26 @@ function readableStreamDefaultControllerClose(controller) {
}

function readableStreamDefaultControllerEnqueue(controller, chunk) {
if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller))
// Equivalent to readableStreamDefaultControllerCanCloseOrEnqueue()
// followed by isReadableStreamLocked() and
// readableStreamGetNumReadRequests(), but with the state loaded once:
// this runs for every enqueued chunk.
const controllerState = controller[kState];
const stream = controllerState.stream;
if (controllerState.closeRequested || stream[kState].state !== 'readable')
return;

const {
stream,
} = controller[kState];

if (isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream)) {
const reader = stream[kState].reader;
if (reader !== undefined &&
reader[kState] !== undefined &&
reader[kType] === 'ReadableStreamDefaultReader' &&
reader[kState].readRequests.length) {
readableStreamFulfillReadRequest(stream, chunk, false);
} else {
try {
const chunkSize =
FunctionPrototypeCall(
controller[kState].sizeAlgorithm,
controllerState.sizeAlgorithm,
undefined,
chunk);
enqueueValueWithSize(controller, chunk, chunkSize);
Expand Down Expand Up @@ -2533,22 +2538,27 @@ function readableStreamDefaultControllerGetDesiredSize(controller) {
}

function readableStreamDefaultControllerShouldCallPull(controller) {
const {
stream,
} = controller[kState];
if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
!controller[kState].started)
// Single-pass version of the spec's predicate chain (CanCloseOrEnqueue,
// IsLocked, HasDefaultReader, GetNumReadRequests, GetDesiredSize): this
// runs at least once per chunk on every default-stream path. The
// desired-size computation is inlined because the stream state is
// already known to be 'readable' here.
const controllerState = controller[kState];
const stream = controllerState.stream;
if (controllerState.closeRequested ||
stream[kState].state !== 'readable' ||
!controllerState.started)
return false;

if (isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream)) {
const reader = stream[kState].reader;
if (reader !== undefined &&
reader[kState] !== undefined &&
reader[kType] === 'ReadableStreamDefaultReader' &&
reader[kState].readRequests.length) {
return true;
}

const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
assert(desiredSize !== null);

return desiredSize > 0;
return controllerState.highWaterMark - controllerState.queueTotalSize > 0;
}

function readableStreamDefaultControllerCallPullIfNeeded(controller) {
Expand Down Expand Up @@ -2798,28 +2808,29 @@ function readableByteStreamControllerGetDesiredSize(controller) {
}

function readableByteStreamControllerShouldCallPull(controller) {
const {
stream,
} = controller[kState];
// Single-pass version of the spec's predicate chain (HasDefaultReader,
// GetNumReadRequests, HasBYOBReader, GetNumReadIntoRequests,
// GetDesiredSize): this runs at least once per chunk on every byte
// stream path. The desired-size computation is inlined because the
// stream state is already known to be 'readable' here.
const controllerState = controller[kState];
const stream = controllerState.stream;
if (stream[kState].state !== 'readable' ||
controller[kState].closeRequested ||
!controller[kState].started) {
controllerState.closeRequested ||
!controllerState.started) {
return false;
}
if (readableStreamHasDefaultReader(stream) &&
readableStreamGetNumReadRequests(stream) > 0) {
return true;
}

if (readableStreamHasBYOBReader(stream) &&
readableStreamGetNumReadIntoRequests(stream) > 0) {
return true;
const reader = stream[kState].reader;
if (reader !== undefined && reader[kState] !== undefined) {
const type = reader[kType];
if (type === 'ReadableStreamDefaultReader') {
if (reader[kState].readRequests.length) return true;
} else if (type === 'ReadableStreamBYOBReader') {
if (reader[kState].readIntoRequests.length) return true;
}
}

const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
assert(desiredSize !== null);

return desiredSize > 0;
return controllerState.highWaterMark - controllerState.queueTotalSize > 0;
}

function readableByteStreamControllerHandleQueueDrain(controller) {
Expand Down
44 changes: 24 additions & 20 deletions lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,7 @@ function InternalTransferredTransformStream() {
readable: undefined,
writable: undefined,
backpressure: undefined,
backpressureChange: {
__proto__: null,
promise: undefined,
resolve: undefined,
reject: undefined,
},
backpressureChange: undefined,
controller: undefined,
};
}
Expand Down Expand Up @@ -390,12 +385,7 @@ function initializeTransformStream(
writable,
controller: undefined,
backpressure: undefined,
backpressureChange: {
__proto__: null,
promise: undefined,
resolve: undefined,
reject: undefined,
},
backpressureChange: undefined,
};

transformStreamSetBackpressure(stream, true);
Expand Down Expand Up @@ -429,12 +419,27 @@ function transformStreamUnblockWrite(stream) {
transformStreamSetBackpressure(stream, false);
}

// The spec's [[backpressureChangePromise]] is only ever observed by the
// source pull algorithm (settles when backpressure next becomes true) and
// by a sink write arriving while backpressure is set (settles when
// backpressure next becomes false). Instead of allocating a fresh promise
// record on every flip, the record is materialized lazily on first
// observation and dropped once settled; flips nobody is waiting on
// allocate nothing.
function transformStreamBackpressureChangePromise(stream) {
const state = stream[kState];
return (state.backpressureChange ??= PromiseWithResolvers()).promise;
}

function transformStreamSetBackpressure(stream, backpressure) {
assert(stream[kState].backpressure !== backpressure);
if (stream[kState].backpressureChange.promise !== undefined)
stream[kState].backpressureChange.resolve?.();
stream[kState].backpressureChange = PromiseWithResolvers();
stream[kState].backpressure = backpressure;
const state = stream[kState];
assert(state.backpressure !== backpressure);
const backpressureChange = state.backpressureChange;
if (backpressureChange !== undefined) {
state.backpressureChange = undefined;
backpressureChange.resolve();
}
state.backpressure = backpressure;
}

function setupTransformStreamDefaultController(
Expand Down Expand Up @@ -554,7 +559,7 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
} = stream[kState];
assert(writable[kState].state === 'writable');
if (stream[kState].backpressure) {
const backpressureChange = stream[kState].backpressureChange.promise;
const backpressureChange = transformStreamBackpressureChangePromise(stream);
return PromisePrototypeThen(
backpressureChange,
() => {
Expand Down Expand Up @@ -638,9 +643,8 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {

function transformStreamDefaultSourcePullAlgorithm(stream) {
assert(stream[kState].backpressure);
assert(stream[kState].backpressureChange.promise !== undefined);
transformStreamSetBackpressure(stream, false);
return stream[kState].backpressureChange.promise;
return transformStreamBackpressureChangePromise(stream);
}

function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
Expand Down
37 changes: 18 additions & 19 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,45 +134,44 @@ function isBrandCheck(brand) {
};
}

// The queue helpers below run once per chunk on the hot paths of every
// default readable/writable stream, so they load the controller state a
// single time and don't assert the existence of the queue fields (both
// are unconditionally initialized during controller setup and only ever
// replaced wholesale).
function dequeueValue(controller) {
assert(controller[kState].queue !== undefined);
assert(controller[kState].queueTotalSize !== undefined);
assert(controller[kState].queue.length);
const state = controller[kState];
assert(state.queue.length);
const {
value,
size,
} = ArrayPrototypeShift(controller[kState].queue);
controller[kState].queueTotalSize =
MathMax(0, controller[kState].queueTotalSize - size);
} = ArrayPrototypeShift(state.queue);
state.queueTotalSize = MathMax(0, state.queueTotalSize - size);
return value;
}

function resetQueue(controller) {
assert(controller[kState].queue !== undefined);
assert(controller[kState].queueTotalSize !== undefined);
controller[kState].queue = [];
controller[kState].queueTotalSize = 0;
const state = controller[kState];
state.queue = [];
state.queueTotalSize = 0;
}

function peekQueueValue(controller) {
assert(controller[kState].queue !== undefined);
assert(controller[kState].queueTotalSize !== undefined);
assert(controller[kState].queue.length);
return controller[kState].queue[0].value;
const state = controller[kState];
assert(state.queue.length);
return state.queue[0].value;
}

function enqueueValueWithSize(controller, value, size) {
assert(controller[kState].queue !== undefined);
assert(controller[kState].queueTotalSize !== undefined);
const state = controller[kState];
const coercedSize = +size;
if (NumberIsNaN(coercedSize) ||
coercedSize < 0 ||
coercedSize === Infinity) {
throw new ERR_INVALID_ARG_VALUE.RangeError('size', size);
}
size = coercedSize;
ArrayPrototypePush(controller[kState].queue, { value, size });
controller[kState].queueTotalSize += size;
ArrayPrototypePush(state.queue, { value, size: coercedSize });
state.queueTotalSize += coercedSize;
}

// Arity-specialized variants of the promise-callback wrapper. The generic
Expand Down
Loading
Loading