From 4dbf7a5f6e7b20308f4616d96e3be280ba02771b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Jun 2026 15:08:49 -0700 Subject: [PATCH] Expose gRPC compression options --- temporalio/ext/src/client.rs | 15 +++++++- temporalio/lib/temporalio/client.rb | 7 +++- .../lib/temporalio/client/connection.rb | 38 ++++++++++++++++++- .../lib/temporalio/internal/bridge/client.rb | 7 +++- temporalio/sig/temporalio/client.rbs | 3 +- .../sig/temporalio/client/connection.rbs | 15 +++++++- .../sig/temporalio/internal/bridge/client.rbs | 12 +++++- temporalio/test/client_test.rb | 15 ++++++++ 8 files changed, 103 insertions(+), 9 deletions(-) diff --git a/temporalio/ext/src/client.rs b/temporalio/ext/src/client.rs index 2efa5c34..cecb2855 100644 --- a/temporalio/ext/src/client.rs +++ b/temporalio/ext/src/client.rs @@ -2,8 +2,8 @@ use std::{collections::HashMap, future::Future, marker::PhantomData, time::Durat use temporalio_client::{ ClientKeepAliveOptions, ClientTlsOptions, Connection, ConnectionOptions, - DnsLoadBalancingOptions, HttpConnectProxyOptions, RetryOptions, TlsOptions, - errors::ClientConnectError, + DnsLoadBalancingOptions, GrpcCompression as CoreGrpcCompression, HttpConnectProxyOptions, + RetryOptions, TlsOptions, errors::ClientConnectError, }; use magnus::{ @@ -98,6 +98,16 @@ impl Client { .child(id!("rpc_retry"))? .ok_or_else(|| error!("Missing rpc_retry"))?; let tls = options.child(id!("tls"))?; + let grpc_compression = match options + .child(id!("grpc_compression"))? + .ok_or_else(|| error!("Missing grpc_compression"))? + .member::(id!("codec"))? + .as_str() + { + "gzip" => CoreGrpcCompression::Gzip, + "none" => CoreGrpcCompression::None, + codec => return Err(error!("Invalid grpc compression codec: {}", codec)), + }; let metrics_meter = runtime.handle.core.telemetry().get_temporal_metric_meter(); let opts = ConnectionOptions::new( Url::parse( @@ -112,6 +122,7 @@ impl Client { ) .client_name(options.member::(id!("client_name"))?) .client_version(options.member::(id!("client_version"))?) + .grpc_compression(grpc_compression) .headers(headers.headers) .binary_headers(headers.binary_headers) .maybe_api_key(options.member::>(id!("api_key"))?) diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index 9ca07f09..77880c55 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -99,6 +99,9 @@ class ListWorkflowPage; end # rubocop:disable Lint/EmptyClass # @param dns_load_balancing [Connection::DnsLoadBalancingOptions, nil] DNS load balancing options for the # connection. Default is +nil+ (disabled). Silently disabled when +http_connect_proxy+ is set, since the two are # mutually exclusive. + # @param grpc_compression [Connection::GrpcCompressionOptions::Gzip, Connection::GrpcCompressionOptions::None] + # Transport-level gRPC compression. Defaults to gzip. Set to {Connection::GrpcCompressionOptions::None} to opt + # out. # # @return [Client] Connected client. # @@ -121,7 +124,8 @@ def self.connect( http_connect_proxy: nil, runtime: Runtime.default, lazy_connect: false, - dns_load_balancing: nil + dns_load_balancing: nil, + grpc_compression: Connection::GrpcCompressionOptions::Gzip.new ) # Prepare connection. The connection var is needed here so it can be used in callback for plugin. base_connection = nil @@ -168,6 +172,7 @@ def self.connect( runtime:, lazy_connect:, dns_load_balancing:, + grpc_compression:, around_connect: # steep:ignore ) diff --git a/temporalio/lib/temporalio/client/connection.rb b/temporalio/lib/temporalio/client/connection.rb index a74f690e..b10dc552 100644 --- a/temporalio/lib/temporalio/client/connection.rb +++ b/temporalio/lib/temporalio/client/connection.rb @@ -25,7 +25,8 @@ class Connection :http_connect_proxy, :runtime, :lazy_connect, - :dns_load_balancing + :dns_load_balancing, + :grpc_compression ) # Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details. @@ -149,6 +150,25 @@ def initialize(resolution_interval: 30.0) end end + # Transport-level gRPC compression options for client connections. + module GrpcCompressionOptions + # Use gzip for transport-level gRPC compression. + class Gzip + # @return [Symbol] Compression codec name. + def codec + :gzip + end + end + + # Disable transport-level gRPC compression. + class None + # @return [Symbol] Compression codec name. + def codec + :none + end + end + end + # @return [Options] Frozen options for this client which has the same attributes as {initialize}. Note that if # {api_key=} or {rpc_metadata=} are updated, the options object is replaced with those changes (it is not # mutated in place). @@ -188,6 +208,8 @@ def initialize(resolution_interval: 30.0) # connection. # @param dns_load_balancing [DnsLoadBalancingOptions, nil] DNS load balancing options for this connection. Default # is +nil+ (disabled). Silently disabled when +http_connect_proxy+ is set, since the two are mutually exclusive. + # @param grpc_compression [GrpcCompressionOptions::Gzip, GrpcCompressionOptions::None] Transport-level gRPC + # compression. Defaults to gzip. Set to {GrpcCompressionOptions::None} to opt out. # @param around_connect [Proc, nil] If present, this proc accepts two values: options and a block. The block must # be yielded to only once with the options. The block does not return a meaningful value, nor should # around_connect. @@ -205,6 +227,7 @@ def initialize( runtime: Runtime.default, lazy_connect: false, dns_load_balancing: nil, + grpc_compression: GrpcCompressionOptions::Gzip.new, around_connect: nil ) @options = Options.new( @@ -218,7 +241,8 @@ def initialize( http_connect_proxy:, runtime:, lazy_connect:, - dns_load_balancing: + dns_load_balancing:, + grpc_compression: ).freeze @core_client_mutex = Mutex.new # Create core client now if not lazy, applying around_connect if present @@ -322,6 +346,16 @@ def new_core_client ), identity: @options.identity || "#{Process.pid}@#{Socket.gethostname}" ) + grpc_compression_codec = case @options.grpc_compression + when GrpcCompressionOptions::Gzip + 'gzip' + when GrpcCompressionOptions::None + 'none' + else + raise ArgumentError, + "Invalid gRPC compression options: #{@options.grpc_compression.inspect}" + end + options.grpc_compression = Internal::Bridge::Client::GrpcCompressionOptions.new(codec: grpc_compression_codec) # Auto-enable TLS when API key is provided and tls not explicitly set tls = @options.tls tls = true if tls.nil? && @options.api_key diff --git a/temporalio/lib/temporalio/internal/bridge/client.rb b/temporalio/lib/temporalio/internal/bridge/client.rb index 01bdfc0e..d00c051e 100644 --- a/temporalio/lib/temporalio/internal/bridge/client.rb +++ b/temporalio/lib/temporalio/internal/bridge/client.rb @@ -17,7 +17,8 @@ class Client :rpc_retry, :keep_alive, # Optional :http_connect_proxy, # Optional - :dns_load_balancing # Optional + :dns_load_balancing, # Optional + :grpc_compression ) TLSOptions = Struct.new( @@ -51,6 +52,10 @@ class Client :resolution_interval ) + GrpcCompressionOptions = Struct.new( + :codec + ) + def self.new(runtime, options) queue = Queue.new async_new(runtime, options, queue) diff --git a/temporalio/sig/temporalio/client.rbs b/temporalio/sig/temporalio/client.rbs index 2627acc6..3edb2cbf 100644 --- a/temporalio/sig/temporalio/client.rbs +++ b/temporalio/sig/temporalio/client.rbs @@ -51,7 +51,8 @@ module Temporalio ?http_connect_proxy: Connection::HTTPConnectProxyOptions?, ?runtime: Runtime, ?lazy_connect: bool, - ?dns_load_balancing: Connection::DnsLoadBalancingOptions? + ?dns_load_balancing: Connection::DnsLoadBalancingOptions?, + ?grpc_compression: Connection::GrpcCompressionOptions::Gzip | Connection::GrpcCompressionOptions::None ) -> Client def self._validate_plugins!: (Array[Plugin] plugins) -> void diff --git a/temporalio/sig/temporalio/client/connection.rbs b/temporalio/sig/temporalio/client/connection.rbs index 1e2ed35f..14e3481e 100644 --- a/temporalio/sig/temporalio/client/connection.rbs +++ b/temporalio/sig/temporalio/client/connection.rbs @@ -13,6 +13,7 @@ module Temporalio attr_reader runtime: Runtime attr_reader lazy_connect: bool attr_reader dns_load_balancing: DnsLoadBalancingOptions? + attr_reader grpc_compression: GrpcCompressionOptions::Gzip | GrpcCompressionOptions::None def initialize: ( target_host: String, @@ -25,7 +26,8 @@ module Temporalio http_connect_proxy: HTTPConnectProxyOptions?, runtime: Runtime, lazy_connect: bool, - dns_load_balancing: DnsLoadBalancingOptions? + dns_load_balancing: DnsLoadBalancingOptions?, + grpc_compression: GrpcCompressionOptions::Gzip | GrpcCompressionOptions::None ) -> void def to_h: -> Hash[Symbol, untyped] @@ -95,6 +97,16 @@ module Temporalio ) -> void end + module GrpcCompressionOptions + class Gzip + def codec: -> :gzip + end + + class None + def codec: -> :none + end + end + attr_reader options: Options attr_reader workflow_service: WorkflowService @@ -113,6 +125,7 @@ module Temporalio ?runtime: Runtime, ?lazy_connect: bool, ?dns_load_balancing: DnsLoadBalancingOptions?, + ?grpc_compression: GrpcCompressionOptions::Gzip | GrpcCompressionOptions::None, ?around_connect: nil | ^(Options) { (Options) -> void } -> void ) -> void diff --git a/temporalio/sig/temporalio/internal/bridge/client.rbs b/temporalio/sig/temporalio/internal/bridge/client.rbs index 8d5cd36d..eb9a6415 100644 --- a/temporalio/sig/temporalio/internal/bridge/client.rbs +++ b/temporalio/sig/temporalio/internal/bridge/client.rbs @@ -14,6 +14,7 @@ module Temporalio attr_accessor keep_alive: KeepAliveOptions? attr_accessor http_connect_proxy: HTTPConnectProxyOptions? attr_accessor dns_load_balancing: DnsLoadBalancingOptions? + attr_accessor grpc_compression: GrpcCompressionOptions? def initialize: ( target_host: String, @@ -26,7 +27,8 @@ module Temporalio rpc_retry: RPCRetryOptions, ?keep_alive: KeepAliveOptions?, ?http_connect_proxy: HTTPConnectProxyOptions?, - ?dns_load_balancing: DnsLoadBalancingOptions? + ?dns_load_balancing: DnsLoadBalancingOptions?, + ?grpc_compression: GrpcCompressionOptions? ) -> void end @@ -92,6 +94,14 @@ module Temporalio ) -> void end + class GrpcCompressionOptions + attr_accessor codec: String + + def initialize: ( + codec: String + ) -> void + end + # Defined in Rust SERVICE_WORKFLOW: Integer diff --git a/temporalio/test/client_test.rb b/temporalio/test/client_test.rb index f1c7af4f..894beb0b 100644 --- a/temporalio/test/client_test.rb +++ b/temporalio/test/client_test.rb @@ -49,6 +49,21 @@ def test_dns_load_balancing_default_interval assert_in_delta 30.0, opts.resolution_interval end + def test_grpc_compression_defaults_to_gzip + client = Temporalio::Client.connect('localhost:7233', 'default', lazy_connect: true) + assert_instance_of Temporalio::Client::Connection::GrpcCompressionOptions::Gzip, + client.connection.options.grpc_compression + assert_equal :gzip, client.connection.options.grpc_compression.codec + end + + def test_grpc_compression_none_preserved + compression_opts = Temporalio::Client::Connection::GrpcCompressionOptions::None.new + client = Temporalio::Client.connect( + 'localhost:7233', 'default', lazy_connect: true, grpc_compression: compression_opts + ) + assert_equal compression_opts, client.connection.options.grpc_compression + end + class TrackCallsInterceptor include Temporalio::Client::Interceptor