From e936c6048f6ccb486e6a624335a6cc9415c491aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Vondrou=C5=A1?= Date: Fri, 22 May 2026 12:00:18 +0200 Subject: [PATCH] Release ByteBuf on channel write failure in ReactorNettyClient When a Netty channel write fails (e.g. broken connection, RST packet), the ByteBuf produced by FrontendMessage.encode() was never released, causing a direct memory leak. The fix separates encode from send: each ByteBuf is now individually sent via connection.outbound().send(Mono.just(buf)).then(), with doOnError and doOnCancel hooks that release the ByteBuf if its reference count is still positive. The refCnt check prevents double-release because Netty's pipeline releases the ByteBuf automatically on successful write. Both locations are fixed: - The request pipeline (constructor) - The close() method (Terminate message) Reproducer: https://github.com/bugs84/r2dbc-postgresql-bytebuf-leak-reproducer Fixes gh-580 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../postgresql/client/ReactorNettyClient.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index c4c59628..2d0607ae 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -199,7 +199,21 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { if (DEBUG_ENABLED) { logger.debug(this.context.getMessage(String.format("Request: %s", message))); } - return connection.outbound().send(message.encode(this.byteBufAllocator)); + return Flux.from(message.encode(this.byteBufAllocator)) + .flatMap(buf -> + connection.outbound().send(Mono.just(buf)).then() + .doOnError(e -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + .doOnCancel(() -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + ) + .then(); }, 1) .then(); @@ -226,7 +240,23 @@ public Mono close() { return Flux.just(Terminate.INSTANCE) .doOnNext(message -> logger.debug(this.context.getMessage(String.format("Request: %s", message)))) - .concatMap(message -> this.connection.outbound().send(message.encode(this.connection.outbound().alloc()))) + .concatMap(message -> + Flux.from(message.encode(this.connection.outbound().alloc())) + .flatMap(buf -> + this.connection.outbound().send(Mono.just(buf)).then() + .doOnError(e -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + .doOnCancel(() -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + ) + .then() + ) .then() .doOnSuccess(v -> this.connection.dispose()) .then(this.connection.onDispose());