diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java index 89956c32b..20b4ec336 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java @@ -22,8 +22,8 @@ import org.junit.Test; public class GrpcMessageTooLargeTest { - private static final String QUERY_ERROR_MESSAGE = - "Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max"; + // This string is kept intentionally short to match multiple possible too-large error messages + private static final String TOO_BIG_ERR_MESSAGE = "larger than max"; private static final String VERY_LARGE_DATA; static { @@ -120,7 +120,7 @@ public void queryResultTooLarge() { assertNotNull(e.getCause()); // The exception will not contain the original failure object, so instead of type check we're // checking the message to ensure the correct error is being sent. - assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE)); + assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE)); } @Test @@ -132,7 +132,7 @@ public void queryErrorTooLarge() { WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query); assertNotNull(e.getCause()); - assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE)); + assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE)); } private static T createWorkflowStub(Class clazz, SDKTestWorkflowRule workflowRule) { diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java index da39c740f..2f5c87397 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java @@ -161,6 +161,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) { return ClientInterceptors.intercept( channel, + new GrpcCompressionInterceptor(options.getGrpcCompression()), MetadataUtils.newAttachHeadersInterceptor(headers), new SystemInfoInterceptor(serverCapabilitiesFuture)); } @@ -206,6 +207,8 @@ private ManagedChannel prepareChannel() { builder.useTransportSecurity(); } + builder.decompressorRegistry(options.getGrpcCompression().getDecompressorRegistry()); + // Disable built-in idleTimer until https://github.com/grpc/grpc-java/issues/8714 is resolved. // jsdk force-idles channels often anyway, so this is not needed until we stop doing // force-idling as a part of diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java new file mode 100644 index 000000000..861c950cd --- /dev/null +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java @@ -0,0 +1,31 @@ +package io.temporal.serviceclient; + +import io.grpc.Codec; +import io.grpc.DecompressorRegistry; +import javax.annotation.Nullable; + +/** Selects transport-level gRPC compression for service calls. */ +public enum GrpcCompression { + /** Do not compress requests or advertise support for compressed responses. */ + NONE(null, DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false)), + + /** Gzip-compress outbound requests and accept gzip-compressed responses. */ + GZIP("gzip", DecompressorRegistry.getDefaultInstance()); + + private final @Nullable String compressorName; + private final DecompressorRegistry decompressorRegistry; + + GrpcCompression(@Nullable String compressorName, DecompressorRegistry decompressorRegistry) { + this.compressorName = compressorName; + this.decompressorRegistry = decompressorRegistry; + } + + @Nullable + String getCompressorName() { + return compressorName; + } + + DecompressorRegistry getDecompressorRegistry() { + return decompressorRegistry; + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java new file mode 100644 index 000000000..4d605ab33 --- /dev/null +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java @@ -0,0 +1,21 @@ +package io.temporal.serviceclient; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; + +final class GrpcCompressionInterceptor implements ClientInterceptor { + private final GrpcCompression compression; + + GrpcCompressionInterceptor(GrpcCompression compression) { + this.compression = compression; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions.withCompression(compression.getCompressorName())); + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java index 0c3d8ae3a..36e3afcbd 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java @@ -114,6 +114,9 @@ public class ServiceStubsOptions { protected final Scope metricsScope; + /** Transport-level gRPC compression. */ + protected final GrpcCompression grpcCompression; + ServiceStubsOptions(ServiceStubsOptions that) { this.channel = that.channel; this.target = that.target; @@ -135,6 +138,7 @@ public class ServiceStubsOptions { this.grpcMetadataProviders = that.grpcMetadataProviders; this.grpcClientInterceptors = that.grpcClientInterceptors; this.metricsScope = that.metricsScope; + this.grpcCompression = that.grpcCompression; } ServiceStubsOptions( @@ -157,7 +161,8 @@ public class ServiceStubsOptions { Metadata headers, Collection grpcMetadataProviders, Collection grpcClientInterceptors, - Scope metricsScope) { + Scope metricsScope, + GrpcCompression grpcCompression) { this.channel = channel; this.target = target; this.channelInitializer = channelInitializer; @@ -178,6 +183,7 @@ public class ServiceStubsOptions { this.grpcMetadataProviders = grpcMetadataProviders; this.grpcClientInterceptors = grpcClientInterceptors; this.metricsScope = metricsScope; + this.grpcCompression = grpcCompression; } /** @@ -342,6 +348,15 @@ public Scope getMetricsScope() { return metricsScope; } + /** + * @return transport-level gRPC compression used for requests and response negotiation. + * @see Builder#setGrpcCompression(GrpcCompression) + */ + @Nonnull + public GrpcCompression getGrpcCompression() { + return grpcCompression; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -366,7 +381,8 @@ public boolean equals(Object o) { && Objects.equals(headers, that.headers) && Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders) && Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors) - && Objects.equals(metricsScope, that.metricsScope); + && Objects.equals(metricsScope, that.metricsScope) + && grpcCompression == that.grpcCompression; } @Override @@ -391,7 +407,8 @@ public int hashCode() { headers, grpcMetadataProviders, grpcClientInterceptors, - metricsScope); + metricsScope, + grpcCompression); } @Override @@ -436,6 +453,8 @@ public String toString() { + grpcClientInterceptors + ", metricsScope=" + metricsScope + + ", grpcCompression=" + + grpcCompression + '}'; } @@ -460,6 +479,7 @@ public static class Builder> { private Collection grpcClientInterceptors; private Scope metricsScope; private boolean apiKeyProvided; + private GrpcCompression grpcCompression = GrpcCompression.GZIP; protected Builder() {} @@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) { ? new ArrayList<>(options.grpcClientInterceptors) : null; this.metricsScope = options.metricsScope; + this.grpcCompression = options.grpcCompression; } /** @@ -720,6 +741,22 @@ public T setMetricsScope(Scope metricsScope) { return self(); } + /** + * Sets transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set to + * {@link GrpcCompression#NONE} to opt out. + * + *

For SDK-created channels, this controls both request compression and the {@code + * grpc-accept-encoding} response negotiation header. For user-supplied channels, the SDK still + * controls request compression, but response decompression negotiation is configured by the + * supplied channel. + * + * @return {@code this} + */ + public T setGrpcCompression(GrpcCompression grpcCompression) { + this.grpcCompression = Objects.requireNonNull(grpcCompression); + return self(); + } + /** * Set the time to wait between service responses on each health check. * @@ -853,7 +890,8 @@ public ServiceStubsOptions build() { this.headers, this.grpcMetadataProviders, this.grpcClientInterceptors, - this.metricsScope); + this.metricsScope, + this.grpcCompression); } public ServiceStubsOptions validateAndBuildWithDefaults() { @@ -916,7 +954,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() { headers, grpcMetadataProviders, grpcClientInterceptors, - metricsScope); + metricsScope, + this.grpcCompression); } } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java new file mode 100644 index 000000000..6c7d71acb --- /dev/null +++ b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java @@ -0,0 +1,91 @@ +package io.temporal.serviceclient; + +import static org.junit.Assert.*; + +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.temporal.api.workflowservice.v1.GetSystemInfoRequest; +import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Rule; +import org.junit.Test; + +public class GrpcCompressionTest { + private static final Metadata.Key GRPC_ENCODING = + Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key GRPC_ACCEPT_ENCODING = + Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER); + + @Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + + @Test + public void gzipCompressionSendsAndAcceptsGzip() throws Exception { + Metadata headers = callGetSystemInfo(GrpcCompression.GZIP); + + assertEquals("gzip", headers.get(GRPC_ENCODING)); + assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip")); + } + + @Test + public void noneCompressionDoesNotSendOrAcceptGzip() throws Exception { + Metadata headers = callGetSystemInfo(GrpcCompression.NONE); + + assertNull(headers.get(GRPC_ENCODING)); + assertNull(headers.get(GRPC_ACCEPT_ENCODING)); + } + + private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception { + AtomicReference capturedHeaders = new AtomicReference<>(); + ServerInterceptor captureHeadersInterceptor = + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + capturedHeaders.set(headers); + return next.startCall(call, headers); + } + }; + Server server = + grpcCleanupRule.register( + NettyServerBuilder.forPort(0) + .addService( + ServerInterceptors.intercept( + new TestWorkflowService(), captureHeadersInterceptor)) + .build() + .start()); + + WorkflowServiceStubs serviceStubs = + WorkflowServiceStubs.newServiceStubs( + WorkflowServiceStubsOptions.newBuilder() + .setTarget("127.0.0.1:" + server.getPort()) + .setEnableHttps(false) + .setGrpcCompression(compression) + .build()); + try { + serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance()); + } finally { + serviceStubs.shutdownNow(); + } + + assertNotNull(capturedHeaders.get()); + return capturedHeaders.get(); + } + + private static final class TestWorkflowService + extends WorkflowServiceGrpc.WorkflowServiceImplBase { + @Override + public void getSystemInfo( + GetSystemInfoRequest request, StreamObserver responseObserver) { + responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } +} diff --git a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java index ccadbffe8..0c0e76d74 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java @@ -151,4 +151,30 @@ public void testSpringBootStyleAutoTLSWithApiKey() { "TLS should be disabled when no API key and no explicit TLS setting", options3.getEnableHttps()); } + + @Test + public void testGrpcCompressionDefaultsToGzip() { + ServiceStubsOptions options = + WorkflowServiceStubsOptions.newBuilder() + .setTarget("localhost:7233") + .validateAndBuildWithDefaults(); + + assertEquals(GrpcCompression.GZIP, options.getGrpcCompression()); + } + + @Test + public void testGrpcCompressionNonePassesThroughBuilderCopy() { + ServiceStubsOptions options = + WorkflowServiceStubsOptions.newBuilder() + .setTarget("localhost:7233") + .setGrpcCompression(GrpcCompression.NONE) + .validateAndBuildWithDefaults(); + + assertEquals(GrpcCompression.NONE, options.getGrpcCompression()); + + ServiceStubsOptions copied = + WorkflowServiceStubsOptions.newBuilder(options).validateAndBuildWithDefaults(); + + assertEquals(GrpcCompression.NONE, copied.getGrpcCompression()); + } }