diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 8cca6b31a50e3f..f2ac4033dc051a 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -1090,7 +1090,7 @@ async function pipeTo(source, ...args) { } else if (transforms.length === 0) { // Fast path: no transforms - iterate normalized source directly if (signal) { - for await (const batch of normalized) { + for await (const batch of yieldAbortable(normalized, signal)) { signal.throwIfAborted(); const p = writeBatch(batch); if (p) await p; diff --git a/test/parallel/test-stream-iter-pipeto-signal.js b/test/parallel/test-stream-iter-pipeto-signal.js index 153ee1a80e6176..ec1324a4e04bdc 100644 --- a/test/parallel/test-stream-iter-pipeto-signal.js +++ b/test/parallel/test-stream-iter-pipeto-signal.js @@ -6,6 +6,7 @@ const common = require('../common'); const assert = require('assert'); +const { setTimeout } = require('timers/promises'); const { pipeTo, from } = require('stream/iter'); // pipeTo with live signal, no transforms — abort mid-stream @@ -30,6 +31,38 @@ async function testPipeToLiveSignalNoTransforms() { assert.ok(written.length >= 1); } +// pipeTo with live signal, no transforms — abort while waiting for next chunk +async function testPipeToLiveSignalNoTransformsPendingNext() { + const ac = new AbortController(); + const reason = new Error('abort reason'); + const writer = { + write: common.mustNotCall(), + }; + const source = { + [Symbol.asyncIterator]() { + return { + next() { + return new Promise(() => {}); + }, + }; + }, + }; + + setTimeout(10) + .then(() => ac.abort(reason)) + .then(common.mustCall()); + + const result = await Promise.race([ + assert.rejects( + () => pipeTo(source, writer, { signal: ac.signal }), + reason, + ).then(() => 'aborted'), + setTimeout(1000, 'timed out'), + ]); + + assert.strictEqual(result, 'aborted'); +} + // pipeTo with live signal + transforms — abort mid-stream async function testPipeToLiveSignalWithTransforms() { const ac = new AbortController(); @@ -84,6 +117,7 @@ async function testPipeToLiveSignalWithTransformsCompletes() { Promise.all([ testPipeToLiveSignalNoTransforms(), + testPipeToLiveSignalNoTransformsPendingNext(), testPipeToLiveSignalWithTransforms(), testPipeToLiveSignalCompletes(), testPipeToLiveSignalWithTransformsCompletes(),