Skip to content

Commit a37cb50

Browse files
feat: add timestamps to MaximumRequestCallbackWaitTimeExceededException
1 parent dcc2a68 commit a37cb50

3 files changed

Lines changed: 94 additions & 6 deletions

File tree

java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(
928928
requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
929929
++this.inflightRequests;
930930
this.inflightBytes += requestWrapper.messageSize;
931+
requestWrapper.placedInWaitingQueueTime = Instant.now();
931932
waitingRequestQueue.addLast(requestWrapper);
932933
healthCheckMetrics.updateWindowedQueuedRequestsMax(
933934
waitingRequestQueue.size() + inflightRequestQueue.size(), queuedRetryCount.get());
@@ -1151,10 +1152,11 @@ private void appendLoop() {
11511152
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
11521153
// Check whether we should error out the current append loop.
11531154
if (inflightRequestQueue.size() > 0) {
1154-
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
1155+
AppendRequestAndResponse firstRequest = inflightRequestQueue.getFirst();
1156+
Instant sendInstant = firstRequest.requestSendTimeStamp;
11551157
if (sendInstant != null) {
11561158
healthCheckMetrics.updateResponseWait(sendInstant);
1157-
throwIfWaitCallbackTooLong(sendInstant);
1159+
throwIfWaitCallbackTooLong(firstRequest);
11581160
}
11591161
}
11601162
healthCheckMetrics.periodicHealthCheck();
@@ -1187,6 +1189,7 @@ private void appendLoop() {
11871189
requestProfilerHook.endOperation(
11881190
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
11891191
waitForBackoffIfNecessary(requestWrapper);
1192+
requestWrapper.placedInInflightQueueTime = Instant.now();
11901193
this.inflightRequestQueue.add(requestWrapper);
11911194
localQueue.addLast(requestWrapper);
11921195
healthCheckMetrics.updateRequestsSent(requestWrapper.messageSize);
@@ -1339,11 +1342,21 @@ private void cleanupConnectionAndRequests(boolean avoidBlocking) {
13391342
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId);
13401343
}
13411344

1342-
private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
1345+
private void throwIfWaitCallbackTooLong(AppendRequestAndResponse requestWrapper) {
1346+
Instant timeToCheck = requestWrapper.requestSendTimeStamp;
1347+
if (timeToCheck == null) {
1348+
return;
1349+
}
13431350
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
13441351
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
13451352
throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException(
1346-
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME);
1353+
milliSinceLastCallback,
1354+
writerId,
1355+
MAXIMUM_REQUEST_CALLBACK_WAIT_TIME,
1356+
requestWrapper.requestReceivedTime,
1357+
requestWrapper.placedInWaitingQueueTime,
1358+
requestWrapper.placedInInflightQueueTime,
1359+
requestWrapper.dispatchTimes);
13471360
}
13481361
}
13491362

@@ -1824,6 +1837,11 @@ static final class AppendRequestAndResponse {
18241837
// If a response is no longer expected this is set back to null.
18251838
Instant requestSendTimeStamp;
18261839

1840+
final Instant requestReceivedTime;
1841+
Instant placedInWaitingQueueTime;
1842+
Instant placedInInflightQueueTime;
1843+
final List<Instant> dispatchTimes = new ArrayList<>();
1844+
18271845
AppendRequestAndResponse(
18281846
AppendRowsRequest message,
18291847
StreamWriter streamWriter,
@@ -1852,10 +1870,12 @@ static final class AppendRequestAndResponse {
18521870
this.retryAlgorithm = null;
18531871
}
18541872
this.recordBatchRowCount = recordBatchRowCount;
1873+
this.requestReceivedTime = Instant.now();
18551874
}
18561875

18571876
void setRequestSendQueueTime() {
18581877
requestSendTimeStamp = Instant.now();
1878+
dispatchTimes.add(requestSendTimeStamp);
18591879
}
18601880
}
18611881

java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
package com.google.cloud.bigquery.storage.v1;
1717

1818
import com.google.api.gax.grpc.GrpcStatusCode;
19+
import com.google.common.collect.ImmutableList;
1920
import com.google.common.collect.ImmutableMap;
2021
import com.google.protobuf.Any;
2122
import com.google.protobuf.InvalidProtocolBufferException;
2223
import io.grpc.Status;
2324
import io.grpc.StatusRuntimeException;
2425
import io.grpc.protobuf.StatusProto;
2526
import java.time.Duration;
27+
import java.time.Instant;
28+
import java.util.List;
2629
import java.util.Map;
2730
import java.util.regex.Matcher;
2831
import java.util.regex.Pattern;
@@ -428,17 +431,46 @@ public static class MaximumRequestCallbackWaitTimeExceededException extends Runt
428431
private final Duration callbackWaitTime;
429432
private final String writerId;
430433
private final Duration callbackWaitTimeLimit;
434+
private final Instant requestReceivedTime;
435+
private final Instant placedInWaitingQueueTime;
436+
private final Instant placedInInflightQueueTime;
437+
private final ImmutableList<Instant> dispatchTimes;
431438

439+
@Deprecated
432440
public MaximumRequestCallbackWaitTimeExceededException(
433441
Duration callbackWaitTime, String writerId, Duration callbackWaitTimeLimit) {
442+
this(callbackWaitTime, writerId, callbackWaitTimeLimit, null, null, null, ImmutableList.of());
443+
}
444+
445+
public MaximumRequestCallbackWaitTimeExceededException(
446+
Duration callbackWaitTime,
447+
String writerId,
448+
Duration callbackWaitTimeLimit,
449+
@Nullable Instant requestReceivedTime,
450+
@Nullable Instant placedInWaitingQueueTime,
451+
@Nullable Instant placedInInflightQueueTime,
452+
@Nullable List<Instant> dispatchTimes) {
434453
super(
435454
String.format(
436455
"Request has waited in inflight queue for %sms for writer %s, "
437-
+ "which is over maximum wait time %s",
438-
callbackWaitTime, writerId, callbackWaitTimeLimit.toString()));
456+
+ "which is over maximum wait time %s. "
457+
+ "requestReceivedTime: %s, placedInWaitingQueueTime: %s, "
458+
+ "placedInInflightQueueTime: %s, dispatchTimes: %s",
459+
callbackWaitTime,
460+
writerId,
461+
callbackWaitTimeLimit.toString(),
462+
requestReceivedTime,
463+
placedInWaitingQueueTime,
464+
placedInInflightQueueTime,
465+
dispatchTimes));
439466
this.callbackWaitTime = callbackWaitTime;
440467
this.writerId = writerId;
441468
this.callbackWaitTimeLimit = callbackWaitTimeLimit;
469+
this.requestReceivedTime = requestReceivedTime;
470+
this.placedInWaitingQueueTime = placedInWaitingQueueTime;
471+
this.placedInInflightQueueTime = placedInInflightQueueTime;
472+
this.dispatchTimes =
473+
dispatchTimes == null ? ImmutableList.of() : ImmutableList.copyOf(dispatchTimes);
442474
}
443475

444476
public Duration getCallbackWaitTime() {
@@ -452,6 +484,25 @@ public String getWriterId() {
452484
public Duration getCallbackWaitTimeLimit() {
453485
return callbackWaitTimeLimit;
454486
}
487+
488+
@Nullable
489+
public Instant getRequestReceivedTime() {
490+
return requestReceivedTime;
491+
}
492+
493+
@Nullable
494+
public Instant getPlacedInWaitingQueueTime() {
495+
return placedInWaitingQueueTime;
496+
}
497+
498+
@Nullable
499+
public Instant getPlacedInInflightQueueTime() {
500+
return placedInInflightQueueTime;
501+
}
502+
503+
public ImmutableList<Instant> getDispatchTimes() {
504+
return dispatchTimes;
505+
}
455506
}
456507

457508
private Exceptions() {}

java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,8 +1440,25 @@ void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exceptio
14401440
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
14411441
if (i == 0) {
14421442
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
1443+
assertThat(ex.getCause()).hasMessageThat().contains("requestReceivedTime:");
1444+
assertThat(ex.getCause()).hasMessageThat().contains("placedInWaitingQueueTime:");
1445+
assertThat(ex.getCause()).hasMessageThat().contains("placedInInflightQueueTime:");
1446+
assertThat(ex.getCause()).hasMessageThat().contains("dispatchTimes:");
14431447
assertThat(ex.getCause())
14441448
.isInstanceOf(Exceptions.MaximumRequestCallbackWaitTimeExceededException.class);
1449+
Exceptions.MaximumRequestCallbackWaitTimeExceededException mace =
1450+
(Exceptions.MaximumRequestCallbackWaitTimeExceededException) ex.getCause();
1451+
assertThat(mace.getRequestReceivedTime()).isNotNull();
1452+
assertThat(mace.getPlacedInWaitingQueueTime()).isNotNull();
1453+
assertThat(mace.getPlacedInInflightQueueTime()).isNotNull();
1454+
assertThat(mace.getDispatchTimes()).isNotNull();
1455+
assertThat(mace.getDispatchTimes()).isNotEmpty();
1456+
assertThat(mace.getRequestReceivedTime().isAfter(mace.getPlacedInWaitingQueueTime()))
1457+
.isFalse();
1458+
assertThat(mace.getPlacedInWaitingQueueTime().isAfter(mace.getPlacedInInflightQueueTime()))
1459+
.isFalse();
1460+
assertThat(mace.getPlacedInInflightQueueTime().isAfter(mace.getDispatchTimes().get(0)))
1461+
.isFalse();
14451462
} else {
14461463
assertThat(ex.getCause())
14471464
.hasMessageThat()

0 commit comments

Comments
 (0)