Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.junit.Test;

public class GrpcMessageTooLargeTest {
private static final String QUERY_ERROR_MESSAGE =
"Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max";
// This string is kept intentionally short to match multiple possible too-large error messages
private static final String TOO_BIG_ERR_MESSAGE = "larger than max";
private static final String VERY_LARGE_DATA;

static {
Expand Down Expand Up @@ -120,7 +120,7 @@ public void queryResultTooLarge() {
assertNotNull(e.getCause());
// The exception will not contain the original failure object, so instead of type check we're
// checking the message to ensure the correct error is being sent.
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE));
}

@Test
Expand All @@ -132,7 +132,7 @@ public void queryErrorTooLarge() {
WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query);

assertNotNull(e.getCause());
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE));
}

private static <T> T createWorkflowStub(Class<T> clazz, SDKTestWorkflowRule workflowRule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) {

return ClientInterceptors.intercept(
channel,
new GrpcCompressionInterceptor(options.getGrpcCompression()),
MetadataUtils.newAttachHeadersInterceptor(headers),
new SystemInfoInterceptor(serverCapabilitiesFuture));
}
Expand Down Expand Up @@ -206,6 +207,8 @@ private ManagedChannel prepareChannel() {
builder.useTransportSecurity();
}

builder.decompressorRegistry(options.getGrpcCompression().getDecompressorRegistry());

// Disable built-in idleTimer until https://github.com/grpc/grpc-java/issues/8714 is resolved.
// jsdk force-idles channels often anyway, so this is not needed until we stop doing
// force-idling as a part of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.temporal.serviceclient;

import io.grpc.Codec;
import io.grpc.DecompressorRegistry;
import javax.annotation.Nullable;

/** Selects transport-level gRPC compression for service calls. */
public enum GrpcCompression {
/** Do not compress requests or advertise support for compressed responses. */
NONE(null, DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false)),

/** Gzip-compress outbound requests and accept gzip-compressed responses. */
GZIP("gzip", DecompressorRegistry.getDefaultInstance());

private final @Nullable String compressorName;
private final DecompressorRegistry decompressorRegistry;

GrpcCompression(@Nullable String compressorName, DecompressorRegistry decompressorRegistry) {
this.compressorName = compressorName;
this.decompressorRegistry = decompressorRegistry;
}

@Nullable
String getCompressorName() {
return compressorName;
}

DecompressorRegistry getDecompressorRegistry() {
return decompressorRegistry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.temporal.serviceclient;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;

final class GrpcCompressionInterceptor implements ClientInterceptor {
private final GrpcCompression compression;

GrpcCompressionInterceptor(GrpcCompression compression) {
this.compression = compression;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return next.newCall(method, callOptions.withCompression(compression.getCompressorName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public class ServiceStubsOptions {

protected final Scope metricsScope;

/** Transport-level gRPC compression. */
protected final GrpcCompression grpcCompression;

ServiceStubsOptions(ServiceStubsOptions that) {
this.channel = that.channel;
this.target = that.target;
Expand All @@ -135,6 +138,7 @@ public class ServiceStubsOptions {
this.grpcMetadataProviders = that.grpcMetadataProviders;
this.grpcClientInterceptors = that.grpcClientInterceptors;
this.metricsScope = that.metricsScope;
this.grpcCompression = that.grpcCompression;
}

ServiceStubsOptions(
Expand All @@ -157,7 +161,8 @@ public class ServiceStubsOptions {
Metadata headers,
Collection<GrpcMetadataProvider> grpcMetadataProviders,
Collection<ClientInterceptor> grpcClientInterceptors,
Scope metricsScope) {
Scope metricsScope,
GrpcCompression grpcCompression) {
this.channel = channel;
this.target = target;
this.channelInitializer = channelInitializer;
Expand All @@ -178,6 +183,7 @@ public class ServiceStubsOptions {
this.grpcMetadataProviders = grpcMetadataProviders;
this.grpcClientInterceptors = grpcClientInterceptors;
this.metricsScope = metricsScope;
this.grpcCompression = grpcCompression;
}

/**
Expand Down Expand Up @@ -342,6 +348,15 @@ public Scope getMetricsScope() {
return metricsScope;
}

/**
* @return transport-level gRPC compression used for requests and response negotiation.
* @see Builder#setGrpcCompression(GrpcCompression)
*/
@Nonnull
public GrpcCompression getGrpcCompression() {
return grpcCompression;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -366,7 +381,8 @@ public boolean equals(Object o) {
&& Objects.equals(headers, that.headers)
&& Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders)
&& Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors)
&& Objects.equals(metricsScope, that.metricsScope);
&& Objects.equals(metricsScope, that.metricsScope)
&& grpcCompression == that.grpcCompression;
}

@Override
Expand All @@ -391,7 +407,8 @@ public int hashCode() {
headers,
grpcMetadataProviders,
grpcClientInterceptors,
metricsScope);
metricsScope,
grpcCompression);
}

@Override
Expand Down Expand Up @@ -436,6 +453,8 @@ public String toString() {
+ grpcClientInterceptors
+ ", metricsScope="
+ metricsScope
+ ", grpcCompression="
+ grpcCompression
+ '}';
}

Expand All @@ -460,6 +479,7 @@ public static class Builder<T extends Builder<T>> {
private Collection<ClientInterceptor> grpcClientInterceptors;
private Scope metricsScope;
private boolean apiKeyProvided;
private GrpcCompression grpcCompression = GrpcCompression.GZIP;

protected Builder() {}

Expand Down Expand Up @@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) {
? new ArrayList<>(options.grpcClientInterceptors)
: null;
this.metricsScope = options.metricsScope;
this.grpcCompression = options.grpcCompression;
}

/**
Expand Down Expand Up @@ -720,6 +741,22 @@ public T setMetricsScope(Scope metricsScope) {
return self();
}

/**
* Sets transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set to
* {@link GrpcCompression#NONE} to opt out.
*
* <p>For SDK-created channels, this controls both request compression and the {@code
* grpc-accept-encoding} response negotiation header. For user-supplied channels, the SDK still
* controls request compression, but response decompression negotiation is configured by the
* supplied channel.
*
* @return {@code this}
*/
public T setGrpcCompression(GrpcCompression grpcCompression) {
this.grpcCompression = Objects.requireNonNull(grpcCompression);
return self();
}

/**
* Set the time to wait between service responses on each health check.
*
Expand Down Expand Up @@ -853,7 +890,8 @@ public ServiceStubsOptions build() {
this.headers,
this.grpcMetadataProviders,
this.grpcClientInterceptors,
this.metricsScope);
this.metricsScope,
this.grpcCompression);
}

public ServiceStubsOptions validateAndBuildWithDefaults() {
Expand Down Expand Up @@ -916,7 +954,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() {
headers,
grpcMetadataProviders,
grpcClientInterceptors,
metricsScope);
metricsScope,
this.grpcCompression);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.temporal.serviceclient;

import static org.junit.Assert.*;

import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Rule;
import org.junit.Test;

public class GrpcCompressionTest {
private static final Metadata.Key<String> GRPC_ENCODING =
Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> GRPC_ACCEPT_ENCODING =
Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER);

@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();

@Test
public void gzipCompressionSendsAndAcceptsGzip() throws Exception {
Metadata headers = callGetSystemInfo(GrpcCompression.GZIP);

assertEquals("gzip", headers.get(GRPC_ENCODING));
assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip"));
}

@Test
public void noneCompressionDoesNotSendOrAcceptGzip() throws Exception {
Metadata headers = callGetSystemInfo(GrpcCompression.NONE);

assertNull(headers.get(GRPC_ENCODING));
assertNull(headers.get(GRPC_ACCEPT_ENCODING));
}

private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception {
AtomicReference<Metadata> capturedHeaders = new AtomicReference<>();
ServerInterceptor captureHeadersInterceptor =
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
capturedHeaders.set(headers);
return next.startCall(call, headers);
}
};
Server server =
grpcCleanupRule.register(
NettyServerBuilder.forPort(0)
.addService(
ServerInterceptors.intercept(
new TestWorkflowService(), captureHeadersInterceptor))
.build()
.start());

WorkflowServiceStubs serviceStubs =
WorkflowServiceStubs.newServiceStubs(
WorkflowServiceStubsOptions.newBuilder()
.setTarget("127.0.0.1:" + server.getPort())
.setEnableHttps(false)
.setGrpcCompression(compression)
.build());
try {
serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance());
} finally {
serviceStubs.shutdownNow();
}

assertNotNull(capturedHeaders.get());
return capturedHeaders.get();
}

private static final class TestWorkflowService
extends WorkflowServiceGrpc.WorkflowServiceImplBase {
@Override
public void getSystemInfo(
GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance());
responseObserver.onCompleted();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,30 @@ public void testSpringBootStyleAutoTLSWithApiKey() {
"TLS should be disabled when no API key and no explicit TLS setting",
options3.getEnableHttps());
}

@Test
public void testGrpcCompressionDefaultsToGzip() {
ServiceStubsOptions options =
WorkflowServiceStubsOptions.newBuilder()
.setTarget("localhost:7233")
.validateAndBuildWithDefaults();

assertEquals(GrpcCompression.GZIP, options.getGrpcCompression());
}

@Test
public void testGrpcCompressionNonePassesThroughBuilderCopy() {
ServiceStubsOptions options =
WorkflowServiceStubsOptions.newBuilder()
.setTarget("localhost:7233")
.setGrpcCompression(GrpcCompression.NONE)
.validateAndBuildWithDefaults();

assertEquals(GrpcCompression.NONE, options.getGrpcCompression());

ServiceStubsOptions copied =
WorkflowServiceStubsOptions.newBuilder(options).validateAndBuildWithDefaults();

assertEquals(GrpcCompression.NONE, copied.getGrpcCompression());
}
}
Loading