From e2918edf7c72e45b1fe2632fe86fa3324ca5760d Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 19 Jun 2026 18:19:06 -0700 Subject: [PATCH] stream: observe abort while awaiting pipeTo source Use the abort-aware iterator wrapper in the no-transform pipeTo() path so a pending source read does not block AbortSignal handling. Fixes: https://github.com/nodejs/node/issues/64014 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/pull.js | 2 +- .../test-stream-iter-pipeto-signal.js | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 8cca6b31a50e3f..f2ac4033dc051a 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -1090,7 +1090,7 @@ async function pipeTo(source, ...args) { } else if (transforms.length === 0) { // Fast path: no transforms - iterate normalized source directly if (signal) { - for await (const batch of normalized) { + for await (const batch of yieldAbortable(normalized, signal)) { signal.throwIfAborted(); const p = writeBatch(batch); if (p) await p; diff --git a/test/parallel/test-stream-iter-pipeto-signal.js b/test/parallel/test-stream-iter-pipeto-signal.js index 153ee1a80e6176..ec1324a4e04bdc 100644 --- a/test/parallel/test-stream-iter-pipeto-signal.js +++ b/test/parallel/test-stream-iter-pipeto-signal.js @@ -6,6 +6,7 @@ const common = require('../common'); const assert = require('assert'); +const { setTimeout } = require('timers/promises'); const { pipeTo, from } = require('stream/iter'); // pipeTo with live signal, no transforms — abort mid-stream @@ -30,6 +31,38 @@ async function testPipeToLiveSignalNoTransforms() { assert.ok(written.length >= 1); } +// pipeTo with live signal, no transforms — abort while waiting for next chunk +async function testPipeToLiveSignalNoTransformsPendingNext() { + const ac = new AbortController(); + const reason = new Error('abort reason'); + const writer = { + write: common.mustNotCall(), + }; + const source = { + [Symbol.asyncIterator]() { + return { + next() { + return new Promise(() => {}); + }, + }; + }, + }; + + setTimeout(10) + .then(() => ac.abort(reason)) + .then(common.mustCall()); + + const result = await Promise.race([ + assert.rejects( + () => pipeTo(source, writer, { signal: ac.signal }), + reason, + ).then(() => 'aborted'), + setTimeout(1000, 'timed out'), + ]); + + assert.strictEqual(result, 'aborted'); +} + // pipeTo with live signal + transforms — abort mid-stream async function testPipeToLiveSignalWithTransforms() { const ac = new AbortController(); @@ -84,6 +117,7 @@ async function testPipeToLiveSignalWithTransformsCompletes() { Promise.all([ testPipeToLiveSignalNoTransforms(), + testPipeToLiveSignalNoTransformsPendingNext(), testPipeToLiveSignalWithTransforms(), testPipeToLiveSignalCompletes(), testPipeToLiveSignalWithTransformsCompletes(),