Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import io.netty.channel.ChannelPipeline;
import io.vertx.core.Completable;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.net.NetSocketInternal;
Expand Down Expand Up @@ -43,7 +42,6 @@ public class DB2SocketConnection extends SocketConnectionBase {

private final DB2ConnectOptions connectOptions;
private DB2Codec codec;
private Handler<Void> closeHandler;
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
public ConnectionState status = ConnectionState.CONNECTING;

Expand Down Expand Up @@ -128,14 +126,6 @@ protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
}
}

@Override
public void handleClose(Throwable t) {
super.handleClose(t);
if (closeHandler != null) {
context().runOnContext(closeHandler);
}
}

@Override
public String system() {
return "db2";
Expand All @@ -146,8 +136,4 @@ public DatabaseMetadata databaseMetadata() {
return connMetadata.getDbMetadata();
}

public DB2SocketConnection closeHandler(Handler<Void> handler) {
closeHandler = handler;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,6 @@ protected void handleMessage(Object msg) {
}
}

@Override
protected void handleException(Throwable t) {
if (t instanceof PgException) {
reportException(t);
} else {
super.handleException(t);
}
}

public int getProcessId() {
return processId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ public void testCancelRequest(TestContext ctx) {

@Test
public void testInflightCommandsFailWhenConnectionClosed(TestContext ctx) {
Async async = ctx.async();
connector.accept(ctx.asyncAssertSuccess(conn1 -> {
conn1.exceptionHandler(err -> {
async.complete();
});
conn1
.query("SELECT pg_backend_pid()")
.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public enum Status {
protected final NetSocketInternal socket;
protected Status status = Status.CONNECTED;

private Throwable exceptionReport;

public SocketConnectionBase(NetSocketInternal socket,
ClientMetrics metrics,
boolean cachePreparedStatements,
Expand Down Expand Up @@ -147,13 +149,7 @@ public io.vertx.core.Context context() {
public void init() {
socket.closeHandler(this::handleClosed);
socket.exceptionHandler(this::handleException);
socket.messageHandler(msg -> {
try {
handleMessage(msg);
} catch (Exception e) {
handleException(e);
}
});
socket.messageHandler(this::handleMessage);
socket.readCompletionHandler(this::handleReadComplete);
}

Expand Down Expand Up @@ -413,26 +409,26 @@ private void removeCachedStatement(String sql) {
}

private void handleClosed(Void v) {
handleClose(null);
handleClose(exceptionReport);
}

protected void handleException(Throwable t) {
private void handleException(Throwable t) {
if (t instanceof DecoderException) {
DecoderException err = (DecoderException) t;
t = err.getCause();
}
handleClose(t);
exceptionReport = t;
}

protected void reportException(Throwable t) {
private void reportException(Throwable t) {
synchronized (this) {
if (holder != null) {
holder.handleException(t);
}
}
}

protected void handleClose(Throwable t) {
private void handleClose(Throwable t) {
if (status != Status.CLOSED) {
status = Status.CLOSED;
if (metrics != null) {
Expand Down