diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index c4c59628..2d0607ae 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -199,7 +199,21 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { if (DEBUG_ENABLED) { logger.debug(this.context.getMessage(String.format("Request: %s", message))); } - return connection.outbound().send(message.encode(this.byteBufAllocator)); + return Flux.from(message.encode(this.byteBufAllocator)) + .flatMap(buf -> + connection.outbound().send(Mono.just(buf)).then() + .doOnError(e -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + .doOnCancel(() -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + ) + .then(); }, 1) .then(); @@ -226,7 +240,23 @@ public Mono close() { return Flux.just(Terminate.INSTANCE) .doOnNext(message -> logger.debug(this.context.getMessage(String.format("Request: %s", message)))) - .concatMap(message -> this.connection.outbound().send(message.encode(this.connection.outbound().alloc()))) + .concatMap(message -> + Flux.from(message.encode(this.connection.outbound().alloc())) + .flatMap(buf -> + this.connection.outbound().send(Mono.just(buf)).then() + .doOnError(e -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + .doOnCancel(() -> { + if (buf.refCnt() > 0) { + ReferenceCountUtil.release(buf); + } + }) + ) + .then() + ) .then() .doOnSuccess(v -> this.connection.dispose()) .then(this.connection.onDispose());