diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index 1162439bf88c3a..2c21a076dfc9d8 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -39,6 +39,10 @@ const { validateObject, } = require('internal/validators'); +const { + markPromiseAsHandled, +} = internalBinding('util'); + const { from, fromSync, @@ -434,6 +438,20 @@ function merge(...args) { const ready = []; let activeCount = normalized.length; let waitResolve = null; + let onAbort; + + if (signal) { + onAbort = () => { + if (waitResolve) { + waitResolve(); + waitResolve = null; + } + }; + signal.addEventListener('abort', onAbort, { + __proto__: null, + once: true, + }); + } // Called when a source's .next() settles. Pushes the result into // the ready queue and wakes the consumer if it's waiting. @@ -498,27 +516,43 @@ function merge(...args) { if (activeCount > 0) { await new Promise((resolve) => { waitResolve = resolve; + if (signal?.aborted) { + waitResolve = null; + resolve(); + } }); } } } catch (err) { primaryError = err; } finally { + if (onAbort !== undefined) { + signal.removeEventListener('abort', onAbort); + } // Clean up: return all iterators. Cleanup errors are not // swallowed - a broken iterator.return() (e.g., failing to // release a resource) should be visible to the caller. - await cleanupIterators(iterators, primaryError); + await cleanupIterators( + iterators, + primaryError, + signal?.aborted && primaryError === signal.reason, + ); } }, }; } -async function cleanupIterators(iterators, primaryError) { +async function cleanupIterators(iterators, primaryError, skipAwaitCleanup) { let cleanupError; await SafePromiseAllReturnVoid(iterators, async (iterator) => { if (iterator.return) { try { - await iterator.return(); + const result = iterator.return(); + if (skipAwaitCleanup) { + markPromiseAsHandled(result); + } else { + await result; + } } catch (err) { // Keep the first cleanup error encountered. cleanupError ??= err; diff --git a/test/parallel/test-stream-iter-consumers-merge.js b/test/parallel/test-stream-iter-consumers-merge.js index 97cc8b9ac89477..b551599f731462 100644 --- a/test/parallel/test-stream-iter-consumers-merge.js +++ b/test/parallel/test-stream-iter-consumers-merge.js @@ -151,6 +151,25 @@ async function testMergeSignalMidIteration() { await assert.rejects(() => iter.next(), { name: 'AbortError' }); } +async function testMergeSignalDuringPendingMultiSourceRead() { + const ac = new AbortController(); + + async function* pending() { + await new Promise(() => {}); + yield []; + } + + const iter = merge(pending(), pending(), { + __proto__: null, + signal: ac.signal, + })[Symbol.asyncIterator](); + + const next = iter.next(); + ac.abort(); + + await assert.rejects(next, { name: 'AbortError' }); +} + // merge() accepts string sources (normalized via from()) async function testMergeStringSources() { const batches = []; @@ -286,6 +305,7 @@ Promise.all([ testMergeSourceError(), testMergeConsumerBreak(), testMergeSignalMidIteration(), + testMergeSignalDuringPendingMultiSourceRead(), testMergeStringSources(), testMergeObjectLikeSources(), testMergeCleanupErrorOnly(),