Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ async function pipeTo(source, ...args) {
} else if (transforms.length === 0) {
// Fast path: no transforms - iterate normalized source directly
if (signal) {
for await (const batch of normalized) {
for await (const batch of yieldAbortable(normalized, signal)) {
signal.throwIfAborted();
const p = writeBatch(batch);
if (p) await p;
Expand Down
34 changes: 34 additions & 0 deletions test/parallel/test-stream-iter-pipeto-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

const common = require('../common');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
const { pipeTo, from } = require('stream/iter');

// pipeTo with live signal, no transforms — abort mid-stream
Expand All @@ -30,6 +31,38 @@ async function testPipeToLiveSignalNoTransforms() {
assert.ok(written.length >= 1);
}

// pipeTo with live signal, no transforms — abort while waiting for next chunk
async function testPipeToLiveSignalNoTransformsPendingNext() {
const ac = new AbortController();
const reason = new Error('abort reason');
const writer = {
write: common.mustNotCall(),
};
const source = {
[Symbol.asyncIterator]() {
return {
next() {
return new Promise(() => {});
},
};
},
};

setTimeout(10)
.then(() => ac.abort(reason))
.then(common.mustCall());

const result = await Promise.race([
assert.rejects(
() => pipeTo(source, writer, { signal: ac.signal }),
reason,
).then(() => 'aborted'),
setTimeout(1000, 'timed out'),
]);

assert.strictEqual(result, 'aborted');
}

// pipeTo with live signal + transforms — abort mid-stream
async function testPipeToLiveSignalWithTransforms() {
const ac = new AbortController();
Expand Down Expand Up @@ -84,6 +117,7 @@ async function testPipeToLiveSignalWithTransformsCompletes() {

Promise.all([
testPipeToLiveSignalNoTransforms(),
testPipeToLiveSignalNoTransformsPendingNext(),
testPipeToLiveSignalWithTransforms(),
testPipeToLiveSignalCompletes(),
testPipeToLiveSignalWithTransformsCompletes(),
Expand Down
Loading