From d23d383cd727de86cc376a10edb7c88c38db7a20 Mon Sep 17 00:00:00 2001 From: Yenfry Herrera Feliz Date: Fri, 24 Apr 2026 10:59:54 -0400 Subject: [PATCH] fix: defer debug resource close on streamed bodies When a command sets `@http.stream = true`, Guzzle's StreamHandler keeps a `stream_notification` callback that writes to the `@http.debug` resource on every read of the response body. `TraceMiddleware::flushHttpDebug()` closed that resource as soon as the middleware chain resolved, so later reads driven by the caller (e.g. the S3 stream wrapper via `copy("s3://...", ...)`) triggered "fprintf(): supplied resource is not a valid stream resource" warnings. Transfer ownership of the debug resource to the response body via a new internal PSR-7 decorator, `Aws\DebugResourceBoundStream`, which closes it exactly once when the body is closed or destroyed. Non-streamed paths still close eagerly. --- .../fix-trace-middleware-debug-stream.json | 7 + src/DebugResourceBoundStream.php | 62 +++++++ src/TraceMiddleware.php | 37 ++++- tests/DebugResourceBoundStreamTest.php | 154 ++++++++++++++++++ tests/TraceMiddlewareTest.php | 139 ++++++++++++++++ 5 files changed, 391 insertions(+), 8 deletions(-) create mode 100644 .changes/nextrelease/fix-trace-middleware-debug-stream.json create mode 100644 src/DebugResourceBoundStream.php create mode 100644 tests/DebugResourceBoundStreamTest.php diff --git a/.changes/nextrelease/fix-trace-middleware-debug-stream.json b/.changes/nextrelease/fix-trace-middleware-debug-stream.json new file mode 100644 index 0000000000..e4f354fd50 --- /dev/null +++ b/.changes/nextrelease/fix-trace-middleware-debug-stream.json @@ -0,0 +1,7 @@ +[ + { + "type": "bugfix", + "category": "TraceMiddleware", + "description": "Defer closing the `@http.debug` resource on streamed response bodies so Guzzle's stream_notification callback can keep writing to it while the body is being read. Fixes \"fprintf(): supplied resource is not a valid stream resource\" warnings when using the S3 stream wrapper with debug enabled." + } +] diff --git a/src/DebugResourceBoundStream.php b/src/DebugResourceBoundStream.php new file mode 100644 index 0000000000..8cbd1edf71 --- /dev/null +++ b/src/DebugResourceBoundStream.php @@ -0,0 +1,62 @@ +stream = $stream; + $this->debugResource = $debugResource; + } + + public function close(): void + { + $this->stream->close(); + $this->releaseDebugResource(); + } + + public function __destruct() + { + $this->releaseDebugResource(); + } + + private function releaseDebugResource(): void + { + if (is_resource($this->debugResource)) { + @fclose($this->debugResource); + } + $this->debugResource = null; + } +} diff --git a/src/TraceMiddleware.php b/src/TraceMiddleware.php index 032043b8e3..7baa52a2c6 100644 --- a/src/TraceMiddleware.php +++ b/src/TraceMiddleware.php @@ -97,7 +97,7 @@ public function __invoke($step, $name) return $next($command, $request)->then( function ($value) use ($step, $name, $command, $start) { - $this->flushHttpDebug($command); + $value = $this->flushHttpDebug($command, $value); $this->stepOutput($start, [ 'step' => $step, 'name' => $name, @@ -284,16 +284,37 @@ private function createHttpDebug(CommandInterface $command) } } - private function flushHttpDebug(CommandInterface $command) + private function flushHttpDebug(CommandInterface $command, $value = null) { - if ($res = $command['@http']['debug']) { - if (is_resource($res)) { - rewind($res); - $this->write(stream_get_contents($res)); - fclose($res); + $res = $command['@http']['debug'] ?? null; + if (!is_resource($res)) { + if ($res !== null) { + $command['@http']['debug'] = null; } - $command['@http']['debug'] = null; + return $value; + } + + rewind($res); + $this->write(stream_get_contents($res)); + $command['@http']['debug'] = null; + + // When the response body is streamed, Guzzle's StreamHandler has + // captured $res in a stream_notification callback that fires on every + // read of the body. Closing it here would raise warnings once + // the caller (e.g. S3 stream wrapper) reads the body. Transfer + // ownership to the body so the resource closes with it. + if ( + !empty($command['@http']['stream']) + && $value instanceof ResultInterface + && ($body = $value['Body'] ?? null) instanceof StreamInterface + && !$body instanceof DebugResourceBoundStream + ) { + $value['Body'] = new DebugResourceBoundStream($body, $res); + return $value; } + + fclose($res); + return $value; } private function write($value) diff --git a/tests/DebugResourceBoundStreamTest.php b/tests/DebugResourceBoundStreamTest.php new file mode 100644 index 0000000000..3307a0029d --- /dev/null +++ b/tests/DebugResourceBoundStreamTest.php @@ -0,0 +1,154 @@ +assertSame('hello', $stream->read(5)); + $this->assertSame(' world', $stream->read(6)); + $this->assertSame('', $stream->read(1)); // past end → triggers EOF + $this->assertTrue($stream->eof()); + $this->assertTrue(is_resource($debug), 'reads must not close the debug FD'); + + $stream->close(); + } + + public function testToStringDelegates() + { + $inner = Utils::streamFor('payload'); + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + $this->assertSame('payload', (string) $stream); + + $stream->close(); + } + + public function testGetSizeAndMetadataDelegate() + { + $inner = Utils::streamFor('abcdef'); + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + $this->assertSame(6, $stream->getSize()); + $this->assertIsArray($stream->getMetadata()); + + $stream->close(); + } + + public function testCloseReleasesDebugResource() + { + $inner = Utils::streamFor('data'); + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + $this->assertTrue(is_resource($debug)); + $stream->close(); + $this->assertFalse( + is_resource($debug), + 'close() must fclose the debug FD' + ); + } + + public function testCloseAlsoClosesInnerStream() + { + $innerHandle = fopen('php://temp', 'w+'); + fwrite($innerHandle, 'x'); + rewind($innerHandle); + $inner = Utils::streamFor($innerHandle); + + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + $stream->close(); + $this->assertFalse(is_resource($innerHandle), 'inner stream must also close'); + $this->assertFalse(is_resource($debug)); + } + + public function testDestructReleasesDebugResource() + { + $inner = Utils::streamFor('abc'); + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + unset($stream); + $this->assertFalse( + is_resource($debug), + '__destruct must fclose the debug FD' + ); + } + + public function testDoubleCloseIsSafe() + { + $inner = Utils::streamFor('abc'); + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + $stream->close(); + // Second close must not warn or throw on the already-closed FD. + $stream->close(); + + $this->assertFalse(is_resource($debug)); + } + + public function testCloseFollowedByDestructIsSafe() + { + $inner = Utils::streamFor('abc'); + $debug = fopen('php://temp', 'w+'); + $stream = new DebugResourceBoundStream($inner, $debug); + + $stream->close(); + // Destructor should see a null debug resource and do nothing. + unset($stream); + + $this->assertFalse(is_resource($debug)); + } + + public function testAlreadyClosedDebugResourceIsToleratedAtDestruct() + { + $inner = Utils::streamFor('abc'); + $debug = fopen('php://temp', 'w+'); + + $stream = new DebugResourceBoundStream($inner, $debug); + + // Simulate a caller who closed the FD out of band — this is the + // scenario that originally caused the regression (#2856). + fclose($debug); + $this->assertFalse(is_resource($debug)); + + $caught = []; + set_error_handler(static function ($severity, $message) use (&$caught) { + $caught[] = $message; + return true; + }); + try { + unset($stream); + } finally { + restore_error_handler(); + } + + $this->assertSame([], $caught); + } + + public function testImplementsStreamInterface() + { + $stream = new DebugResourceBoundStream( + Utils::streamFor(''), + fopen('php://temp', 'w+') + ); + $this->assertInstanceOf(StreamInterface::class, $stream); + $stream->close(); + } +} diff --git a/tests/TraceMiddlewareTest.php b/tests/TraceMiddlewareTest.php index 57abeb6708..d60bbc906b 100644 --- a/tests/TraceMiddlewareTest.php +++ b/tests/TraceMiddlewareTest.php @@ -4,6 +4,7 @@ use Aws\Api\Service; use Aws\AwsClient; use Aws\Command; +use Aws\DebugResourceBoundStream; use Aws\EventBridge\EventBridgeClient; use Aws\Exception\AwsException; use Aws\HandlerList; @@ -13,7 +14,9 @@ use GuzzleHttp\Promise\RejectedPromise; use GuzzleHttp\Psr7\Request; use GuzzleHttp\Psr7\Response; +use GuzzleHttp\Psr7\Utils; use GuzzleHttp\Promise; +use Psr\Http\Message\StreamInterface; use Yoast\PHPUnitPolyfills\TestCases\TestCase; use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\Attributes\CoversClass; @@ -296,6 +299,142 @@ public function testCanScrubOnArbitraryPatterns() } } + /** + * @return array{0: HandlerList, 1: resource|null} + */ + private function buildStreamedHandlerList(StreamInterface $body, &$capturedRes): HandlerList + { + $list = new HandlerList(); + $list->setHandler(function ($cmd, $req) use ($body, &$capturedRes) { + $capturedRes = $cmd['@http']['debug'] ?? null; + return Promise\Create::promiseFor(new Result(['Body' => $body])); + }); + // TraceMiddleware is interposed around each middleware step, so we + // need at least one middleware for it to wrap. + $list->appendInit(function ($handler) { + return function ($cmd, $req = null) use ($handler) { + return $handler($cmd, $req); + }; + }); + $list->interpose(new TraceMiddleware(['logfn' => function () {}])); + + return $list; + } + + public function testStreamedBodyDefersDebugResourceClose() + { + $payload = 'streamed-response-body'; + $body = Utils::streamFor($payload); + $capturedRes = null; + + $list = $this->buildStreamedHandlerList($body, $capturedRes); + + $handler = $list->resolve(); + $command = new Command('GetObject'); + $command['@http']['stream'] = true; + + /** @var Result $result */ + $result = $handler($command, new Request('GET', 'http://foo/'))->wait(); + + $this->assertInstanceOf(DebugResourceBoundStream::class, $result['Body']); + $this->assertTrue(is_resource($capturedRes), 'debug FD must remain open while body is live'); + $this->assertSame($payload, (string) $result['Body']); + + $result['Body']->close(); + $this->assertFalse(is_resource($capturedRes), 'debug FD must close when body closes'); + } + + public function testNonStreamedCommandStillClosesDebugResourceEagerly() + { + $capturedRes = null; + $list = $this->buildStreamedHandlerList(Utils::streamFor('x'), $capturedRes); + + $handler = $list->resolve(); + $command = new Command('GetObject'); // no @http.stream flag + + $result = $handler($command, new Request('GET', 'http://foo/'))->wait(); + + $this->assertNotInstanceOf(DebugResourceBoundStream::class, $result['Body']); + $this->assertFalse(is_resource($capturedRes), 'non-streamed commands must close eagerly'); + } + + public function testRejectedPromiseClosesDebugResource() + { + $capturedRes = null; + + $list = new HandlerList(); + $list->setHandler(function ($cmd, $req) use (&$capturedRes) { + $capturedRes = $cmd['@http']['debug'] ?? null; + return new RejectedPromise(new \RuntimeException('boom')); + }); + $list->appendInit(function ($handler) { + return function ($cmd, $req = null) use ($handler) { + return $handler($cmd, $req); + }; + }); + $list->interpose(new TraceMiddleware(['logfn' => function () {}])); + + $handler = $list->resolve(); + $command = new Command('GetObject'); + $command['@http']['stream'] = true; + + try { + $handler($command, new Request('GET', 'http://foo/'))->wait(); + $this->fail('expected rejection'); + } catch (\RuntimeException $e) { + // expected + } + + $this->assertFalse(is_resource($capturedRes), 'rejected path must still fclose'); + } + + /** + * after the middleware chain returns, reads on the + * streamed body must not emit warnings — Guzzle's stream_notification + * closure still references the trace debug FD. Previously the FD was + * fclose()d inside flushHttpDebug, and fread()s on the body raised + * "fprintf(): supplied resource is not a valid stream resource". + */ + public function testStreamedBodyReadAfterFlushDoesNotEmitWarnings() + { + $debugFd = null; + $list = $this->buildStreamedHandlerList( + Utils::streamFor('streamed-response-body'), + $debugFd + ); + + $handler = $list->resolve(); + $command = new Command('GetObject'); + $command['@http']['stream'] = true; + + /** @var Result $result */ + $result = $handler($command, new Request('GET', 'http://foo/'))->wait(); + + // Simulate Guzzle's notifier firing on every read of the body. + $body = $result['Body']; + $this->assertInstanceOf(StreamInterface::class, $body); + + $caught = []; + set_error_handler(static function ($severity, $message) use (&$caught) { + $caught[] = $message; + return true; + }); + try { + while (!$body->eof()) { + $chunk = $body->read(4); + if ($chunk === '') { + break; + } + // Mirrors GuzzleHttp\Handler\StreamHandler::add_debug() closure: + @fprintf($debugFd, " [PROGRESS]\n"); + } + } finally { + restore_error_handler(); + } + + $this->assertSame([], $caught, 'no PHP warnings should be emitted'); + } + private function generateTestClient(Service $service, $args = []) { return new AwsClient(