Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 103 additions & 47 deletions lib/internal/streams/iter/consumers.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const {
ArrayPrototypeSlice,
Promise,
PromisePrototypeThen,
PromiseResolve,
SafePromiseAllReturnVoid,
SymbolAsyncIterator,
TypedArrayPrototypeGetBuffer,
Expand Down Expand Up @@ -412,19 +413,22 @@ function merge(...args) {

return {
__proto__: null,
async *[SymbolAsyncIterator]() {
[SymbolAsyncIterator]() {
const signal = options?.signal;

signal?.throwIfAborted();

if (normalized.length === 0) return;
if (normalized.length === 0) {
return (async function*() {})();
}

if (normalized.length === 1) {
for await (const batch of normalized[0]) {
signal?.throwIfAborted();
yield batch;
}
return;
return (async function*() {
for await (const batch of normalized[0]) {
signal?.throwIfAborted();
yield batch;
}
})();
}

// Multiple sources - use a ready queue so that batches that settle
Expand All @@ -434,74 +438,65 @@ function merge(...args) {
const ready = [];
let activeCount = normalized.length;
let waitResolve = null;
let lastIterator = null;
let started = false;
let closed = false;

// Called when a source's .next() settles. Pushes the result into
// the ready queue and wakes the consumer if it's waiting.
const onSettled = (iterator, result) => {
if (closed) return;
if (result.done) {
activeCount--;
} else {
ArrayPrototypePush(ready, result.value);
// Immediately request the next value from this source
// (at most one pending .next() per source)
PromisePrototypeThen(
iterator.next(),
(r) => onSettled(iterator, r),
(err) => {
ArrayPrototypePush(ready, { __proto__: null, error: err });
if (waitResolve) {
waitResolve();
waitResolve = null;
}
},
);
ArrayPrototypePush(ready, {
__proto__: null,
iterator,
value: result.value,
});
}
if (waitResolve) {
waitResolve();
waitResolve = null;
}
};

// Start one .next() per source
const iterators = [];
for (let i = 0; i < normalized.length; i++) {
const iterator = normalized[i][SymbolAsyncIterator]();
ArrayPrototypePush(iterators, iterator);
const requestNext = (iterator) => {
PromisePrototypeThen(
iterator.next(),
(r) => onSettled(iterator, r),
(err) => {
if (closed) return;
ArrayPrototypePush(ready, { __proto__: null, error: err });
if (waitResolve) {
waitResolve();
waitResolve = null;
}
},
);
}
};

try {
while (activeCount > 0 || ready.length > 0) {
signal?.throwIfAborted();
const iterators = [];

// Drain ready queue synchronously
while (ready.length > 0) {
const item = ArrayPrototypeShift(ready);
if (item?.error) {
throw item.error;
}
yield item;
}
const start = () => {
if (started) return;
started = true;
for (let i = 0; i < normalized.length; i++) {
const iterator = normalized[i][SymbolAsyncIterator]();
ArrayPrototypePush(iterators, iterator);
requestNext(iterator);
}
};

// If sources are still active, wait for the next settlement
if (activeCount > 0) {
await new Promise((resolve) => {
waitResolve = resolve;
});
}
const cleanup = async () => {
if (closed) {
return { __proto__: null, done: true, value: undefined };
}
closed = true;
if (waitResolve) {
waitResolve();
waitResolve = null;
}
} finally {
// Clean up: return all iterators
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
if (iterator.return) {
try {
Expand All @@ -511,7 +506,68 @@ function merge(...args) {
}
}
});
}
return { __proto__: null, done: true, value: undefined };
};

const nextImpl = async () => {
try {
if (closed) {
return cleanup();
}

signal?.throwIfAborted();

start();

if (lastIterator !== null) {
requestNext(lastIterator);
lastIterator = null;
}

while (activeCount > 0 || ready.length > 0) {
signal?.throwIfAborted();

if (ready.length > 0) {
const item = ArrayPrototypeShift(ready);
if (item?.error) {
await cleanup();
throw item.error;
}
lastIterator = item.iterator;
return { __proto__: null, done: false, value: item.value };
}

await new Promise((resolve) => { waitResolve = resolve; });
}

return cleanup();
} catch (err) {
await cleanup();
throw err;
}
};

let nextQueue = PromiseResolve();
const enqueue = (fn) => {
const result = PromisePrototypeThen(nextQueue, fn, fn);
nextQueue = PromisePrototypeThen(result, () => {}, () => {});
return result;
};

return {
__proto__: null,
[SymbolAsyncIterator]() {
return this;
},

next() {
return enqueue(nextImpl);
},

return() {
return enqueue(cleanup);
},
};
},
};
}
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-stream-iter-consumers-merge.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,31 @@ async function testMergeConsumerBreak() {
assert.strictEqual(source1Return && source2Return, true);
}

async function testMergeDoesNotDrainIdleSources() {
function source(n) {
return {
pulls: 0,
async *[Symbol.asyncIterator]() {
while (this.pulls < n) {
yield [Buffer.from(`${++this.pulls}`)];
}
},
};
}

const source1 = source(5);
const source2 = source(5);
const iterator = merge(source1, source2)[Symbol.asyncIterator]();

await iterator.next();
await new Promise((resolve) => setTimeout(resolve, 20));

assert.ok(source1.pulls <= 1);
assert.ok(source2.pulls <= 1);

await iterator.return?.();
}

async function testMergeSignalMidIteration() {
const ac = new AbortController();
async function* slowSource() {
Expand Down Expand Up @@ -190,6 +215,7 @@ Promise.all([
testMergeSyncSources(),
testMergeSourceError(),
testMergeConsumerBreak(),
testMergeDoesNotDrainIdleSources(),
testMergeSignalMidIteration(),
testMergeStringSources(),
testMergeObjectLikeSources(),
Expand Down
Loading