Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(
requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
requestWrapper.placedInWaitingQueueTime = Instant.now();
waitingRequestQueue.addLast(requestWrapper);
healthCheckMetrics.updateWindowedQueuedRequestsMax(
waitingRequestQueue.size() + inflightRequestQueue.size(), queuedRetryCount.get());
Expand Down Expand Up @@ -1151,10 +1152,11 @@ private void appendLoop() {
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
// Check whether we should error out the current append loop.
if (inflightRequestQueue.size() > 0) {
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
AppendRequestAndResponse firstRequest = inflightRequestQueue.getFirst();
Instant sendInstant = firstRequest.requestSendTimeStamp;
if (sendInstant != null) {
healthCheckMetrics.updateResponseWait(sendInstant);
throwIfWaitCallbackTooLong(sendInstant);
throwIfWaitCallbackTooLong(firstRequest);
}
}
healthCheckMetrics.periodicHealthCheck();
Expand Down Expand Up @@ -1187,6 +1189,7 @@ private void appendLoop() {
requestProfilerHook.endOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
waitForBackoffIfNecessary(requestWrapper);
requestWrapper.placedInInflightQueueTime = Instant.now();
this.inflightRequestQueue.add(requestWrapper);
localQueue.addLast(requestWrapper);
healthCheckMetrics.updateRequestsSent(requestWrapper.messageSize);
Expand Down Expand Up @@ -1339,11 +1342,21 @@ private void cleanupConnectionAndRequests(boolean avoidBlocking) {
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId);
}

private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
private void throwIfWaitCallbackTooLong(AppendRequestAndResponse requestWrapper) {
Instant timeToCheck = requestWrapper.requestSendTimeStamp;
if (timeToCheck == null) {
return;
}
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException(
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME);
milliSinceLastCallback,
writerId,
MAXIMUM_REQUEST_CALLBACK_WAIT_TIME,
requestWrapper.requestReceivedTime,
requestWrapper.placedInWaitingQueueTime,
requestWrapper.placedInInflightQueueTime,
requestWrapper.dispatchTimes);
}
}

Expand Down Expand Up @@ -1824,6 +1837,11 @@ static final class AppendRequestAndResponse {
// If a response is no longer expected this is set back to null.
Instant requestSendTimeStamp;

final Instant requestReceivedTime;
Instant placedInWaitingQueueTime;
Instant placedInInflightQueueTime;
final List<Instant> dispatchTimes = new ArrayList<>();

AppendRequestAndResponse(
AppendRowsRequest message,
StreamWriter streamWriter,
Expand Down Expand Up @@ -1852,10 +1870,12 @@ static final class AppendRequestAndResponse {
this.retryAlgorithm = null;
}
this.recordBatchRowCount = recordBatchRowCount;
this.requestReceivedTime = Instant.now();
}

void setRequestSendQueueTime() {
requestSendTimeStamp = Instant.now();
dispatchTimes.add(requestSendTimeStamp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package com.google.cloud.bigquery.storage.v1;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Comment thread
agrawal-siddharth marked this conversation as resolved.
Expand Down Expand Up @@ -428,17 +431,46 @@ public static class MaximumRequestCallbackWaitTimeExceededException extends Runt
private final Duration callbackWaitTime;
private final String writerId;
private final Duration callbackWaitTimeLimit;
private final Instant requestReceivedTime;
private final Instant placedInWaitingQueueTime;
private final Instant placedInInflightQueueTime;
private final ImmutableList<Instant> dispatchTimes;

@Deprecated
public MaximumRequestCallbackWaitTimeExceededException(
Duration callbackWaitTime, String writerId, Duration callbackWaitTimeLimit) {
this(callbackWaitTime, writerId, callbackWaitTimeLimit, null, null, null, ImmutableList.of());
}

public MaximumRequestCallbackWaitTimeExceededException(
Duration callbackWaitTime,
String writerId,
Duration callbackWaitTimeLimit,
@Nullable Instant requestReceivedTime,
@Nullable Instant placedInWaitingQueueTime,
@Nullable Instant placedInInflightQueueTime,
@Nullable List<Instant> dispatchTimes) {
super(
String.format(
"Request has waited in inflight queue for %sms for writer %s, "
+ "which is over maximum wait time %s",
callbackWaitTime, writerId, callbackWaitTimeLimit.toString()));
+ "which is over maximum wait time %s. "
+ "requestReceivedTime: %s, placedInWaitingQueueTime: %s, "
+ "placedInInflightQueueTime: %s, dispatchTimes: %s",
callbackWaitTime,
writerId,
callbackWaitTimeLimit.toString(),
requestReceivedTime,
placedInWaitingQueueTime,
placedInInflightQueueTime,
dispatchTimes));
this.callbackWaitTime = callbackWaitTime;
this.writerId = writerId;
this.callbackWaitTimeLimit = callbackWaitTimeLimit;
this.requestReceivedTime = requestReceivedTime;
this.placedInWaitingQueueTime = placedInWaitingQueueTime;
this.placedInInflightQueueTime = placedInInflightQueueTime;
this.dispatchTimes =
dispatchTimes == null ? ImmutableList.of() : ImmutableList.copyOf(dispatchTimes);
}

public Duration getCallbackWaitTime() {
Expand All @@ -452,6 +484,25 @@ public String getWriterId() {
public Duration getCallbackWaitTimeLimit() {
return callbackWaitTimeLimit;
}

@Nullable
public Instant getRequestReceivedTime() {
return requestReceivedTime;
}

@Nullable
public Instant getPlacedInWaitingQueueTime() {
return placedInWaitingQueueTime;
}

@Nullable
public Instant getPlacedInInflightQueueTime() {
return placedInInflightQueueTime;
}

public ImmutableList<Instant> getDispatchTimes() {
return dispatchTimes;
}
}

private Exceptions() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,8 +1440,25 @@ void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exceptio
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
if (i == 0) {
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
assertThat(ex.getCause()).hasMessageThat().contains("requestReceivedTime:");
assertThat(ex.getCause()).hasMessageThat().contains("placedInWaitingQueueTime:");
assertThat(ex.getCause()).hasMessageThat().contains("placedInInflightQueueTime:");
assertThat(ex.getCause()).hasMessageThat().contains("dispatchTimes:");
assertThat(ex.getCause())
.isInstanceOf(Exceptions.MaximumRequestCallbackWaitTimeExceededException.class);
Exceptions.MaximumRequestCallbackWaitTimeExceededException mace =
(Exceptions.MaximumRequestCallbackWaitTimeExceededException) ex.getCause();
assertThat(mace.getRequestReceivedTime()).isNotNull();
assertThat(mace.getPlacedInWaitingQueueTime()).isNotNull();
assertThat(mace.getPlacedInInflightQueueTime()).isNotNull();
assertThat(mace.getDispatchTimes()).isNotNull();
assertThat(mace.getDispatchTimes()).isNotEmpty();
assertThat(mace.getRequestReceivedTime().isAfter(mace.getPlacedInWaitingQueueTime()))
.isFalse();
assertThat(mace.getPlacedInWaitingQueueTime().isAfter(mace.getPlacedInInflightQueueTime()))
.isFalse();
assertThat(mace.getPlacedInInflightQueueTime().isAfter(mace.getDispatchTimes().get(0)))
.isFalse();
} else {
assertThat(ex.getCause())
.hasMessageThat()
Expand Down
Loading