Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{collections::HashMap, future::Future, marker::PhantomData, time::Durat

use temporalio_client::{
ClientKeepAliveOptions, ClientTlsOptions, Connection, ConnectionOptions,
DnsLoadBalancingOptions, HttpConnectProxyOptions, RetryOptions, TlsOptions,
errors::ClientConnectError,
DnsLoadBalancingOptions, GrpcCompression as CoreGrpcCompression, HttpConnectProxyOptions,
RetryOptions, TlsOptions, errors::ClientConnectError,
};

use magnus::{
Expand Down Expand Up @@ -98,6 +98,16 @@ impl Client {
.child(id!("rpc_retry"))?
.ok_or_else(|| error!("Missing rpc_retry"))?;
let tls = options.child(id!("tls"))?;
let grpc_compression = match options
.child(id!("grpc_compression"))?
.ok_or_else(|| error!("Missing grpc_compression"))?
.member::<String>(id!("codec"))?
.as_str()
{
"gzip" => CoreGrpcCompression::Gzip,
"none" => CoreGrpcCompression::None,
codec => return Err(error!("Invalid grpc compression codec: {}", codec)),
};
let metrics_meter = runtime.handle.core.telemetry().get_temporal_metric_meter();
let opts = ConnectionOptions::new(
Url::parse(
Expand All @@ -112,6 +122,7 @@ impl Client {
)
.client_name(options.member::<String>(id!("client_name"))?)
.client_version(options.member::<String>(id!("client_version"))?)
.grpc_compression(grpc_compression)
.headers(headers.headers)
.binary_headers(headers.binary_headers)
.maybe_api_key(options.member::<Option<String>>(id!("api_key"))?)
Expand Down
7 changes: 6 additions & 1 deletion temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class ListWorkflowPage; end # rubocop:disable Lint/EmptyClass
# @param dns_load_balancing [Connection::DnsLoadBalancingOptions, nil] DNS load balancing options for the
# connection. Default is +nil+ (disabled). Silently disabled when +http_connect_proxy+ is set, since the two are
# mutually exclusive.
# @param grpc_compression [Connection::GrpcCompressionOptions::Gzip, Connection::GrpcCompressionOptions::None]
# Transport-level gRPC compression. Defaults to gzip. Set to {Connection::GrpcCompressionOptions::None} to opt
# out.
#
# @return [Client] Connected client.
#
Expand All @@ -121,7 +124,8 @@ def self.connect(
http_connect_proxy: nil,
runtime: Runtime.default,
lazy_connect: false,
dns_load_balancing: nil
dns_load_balancing: nil,
grpc_compression: Connection::GrpcCompressionOptions::Gzip.new
)
# Prepare connection. The connection var is needed here so it can be used in callback for plugin.
base_connection = nil
Expand Down Expand Up @@ -168,6 +172,7 @@ def self.connect(
runtime:,
lazy_connect:,
dns_load_balancing:,
grpc_compression:,
around_connect: # steep:ignore
)

Expand Down
38 changes: 36 additions & 2 deletions temporalio/lib/temporalio/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class Connection
:http_connect_proxy,
:runtime,
:lazy_connect,
:dns_load_balancing
:dns_load_balancing,
:grpc_compression
)

# Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
Expand Down Expand Up @@ -149,6 +150,25 @@ def initialize(resolution_interval: 30.0)
end
end

# Transport-level gRPC compression options for client connections.
module GrpcCompressionOptions
# Use gzip for transport-level gRPC compression.
class Gzip
# @return [Symbol] Compression codec name.
def codec
:gzip
end
end

# Disable transport-level gRPC compression.
class None
# @return [Symbol] Compression codec name.
def codec
:none
end
end
end

# @return [Options] Frozen options for this client which has the same attributes as {initialize}. Note that if
# {api_key=} or {rpc_metadata=} are updated, the options object is replaced with those changes (it is not
# mutated in place).
Expand Down Expand Up @@ -188,6 +208,8 @@ def initialize(resolution_interval: 30.0)
# connection.
# @param dns_load_balancing [DnsLoadBalancingOptions, nil] DNS load balancing options for this connection. Default
# is +nil+ (disabled). Silently disabled when +http_connect_proxy+ is set, since the two are mutually exclusive.
# @param grpc_compression [GrpcCompressionOptions::Gzip, GrpcCompressionOptions::None] Transport-level gRPC
# compression. Defaults to gzip. Set to {GrpcCompressionOptions::None} to opt out.
# @param around_connect [Proc, nil] If present, this proc accepts two values: options and a block. The block must
# be yielded to only once with the options. The block does not return a meaningful value, nor should
# around_connect.
Expand All @@ -205,6 +227,7 @@ def initialize(
runtime: Runtime.default,
lazy_connect: false,
dns_load_balancing: nil,
grpc_compression: GrpcCompressionOptions::Gzip.new,
around_connect: nil
)
@options = Options.new(
Expand All @@ -218,7 +241,8 @@ def initialize(
http_connect_proxy:,
runtime:,
lazy_connect:,
dns_load_balancing:
dns_load_balancing:,
grpc_compression:
).freeze
@core_client_mutex = Mutex.new
# Create core client now if not lazy, applying around_connect if present
Expand Down Expand Up @@ -322,6 +346,16 @@ def new_core_client
),
identity: @options.identity || "#{Process.pid}@#{Socket.gethostname}"
)
grpc_compression_codec = case @options.grpc_compression
when GrpcCompressionOptions::Gzip
'gzip'
when GrpcCompressionOptions::None
'none'
else
raise ArgumentError,
"Invalid gRPC compression options: #{@options.grpc_compression.inspect}"
end
options.grpc_compression = Internal::Bridge::Client::GrpcCompressionOptions.new(codec: grpc_compression_codec)
# Auto-enable TLS when API key is provided and tls not explicitly set
tls = @options.tls
tls = true if tls.nil? && @options.api_key
Expand Down
7 changes: 6 additions & 1 deletion temporalio/lib/temporalio/internal/bridge/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class Client
:rpc_retry,
:keep_alive, # Optional
:http_connect_proxy, # Optional
:dns_load_balancing # Optional
:dns_load_balancing, # Optional
:grpc_compression
)

TLSOptions = Struct.new(
Expand Down Expand Up @@ -51,6 +52,10 @@ class Client
:resolution_interval
)

GrpcCompressionOptions = Struct.new(
:codec
)

def self.new(runtime, options)
queue = Queue.new
async_new(runtime, options, queue)
Expand Down
3 changes: 2 additions & 1 deletion temporalio/sig/temporalio/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ module Temporalio
?http_connect_proxy: Connection::HTTPConnectProxyOptions?,
?runtime: Runtime,
?lazy_connect: bool,
?dns_load_balancing: Connection::DnsLoadBalancingOptions?
?dns_load_balancing: Connection::DnsLoadBalancingOptions?,
?grpc_compression: Connection::GrpcCompressionOptions::Gzip | Connection::GrpcCompressionOptions::None
) -> Client

def self._validate_plugins!: (Array[Plugin] plugins) -> void
Expand Down
15 changes: 14 additions & 1 deletion temporalio/sig/temporalio/client/connection.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Temporalio
attr_reader runtime: Runtime
attr_reader lazy_connect: bool
attr_reader dns_load_balancing: DnsLoadBalancingOptions?
attr_reader grpc_compression: GrpcCompressionOptions::Gzip | GrpcCompressionOptions::None

def initialize: (
target_host: String,
Expand All @@ -25,7 +26,8 @@ module Temporalio
http_connect_proxy: HTTPConnectProxyOptions?,
runtime: Runtime,
lazy_connect: bool,
dns_load_balancing: DnsLoadBalancingOptions?
dns_load_balancing: DnsLoadBalancingOptions?,
grpc_compression: GrpcCompressionOptions::Gzip | GrpcCompressionOptions::None
) -> void

def to_h: -> Hash[Symbol, untyped]
Expand Down Expand Up @@ -95,6 +97,16 @@ module Temporalio
) -> void
end

module GrpcCompressionOptions
class Gzip
def codec: -> :gzip
end

class None
def codec: -> :none
end
end

attr_reader options: Options

attr_reader workflow_service: WorkflowService
Expand All @@ -113,6 +125,7 @@ module Temporalio
?runtime: Runtime,
?lazy_connect: bool,
?dns_load_balancing: DnsLoadBalancingOptions?,
?grpc_compression: GrpcCompressionOptions::Gzip | GrpcCompressionOptions::None,
?around_connect: nil | ^(Options) { (Options) -> void } -> void
) -> void

Expand Down
12 changes: 11 additions & 1 deletion temporalio/sig/temporalio/internal/bridge/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Temporalio
attr_accessor keep_alive: KeepAliveOptions?
attr_accessor http_connect_proxy: HTTPConnectProxyOptions?
attr_accessor dns_load_balancing: DnsLoadBalancingOptions?
attr_accessor grpc_compression: GrpcCompressionOptions?

def initialize: (
target_host: String,
Expand All @@ -26,7 +27,8 @@ module Temporalio
rpc_retry: RPCRetryOptions,
?keep_alive: KeepAliveOptions?,
?http_connect_proxy: HTTPConnectProxyOptions?,
?dns_load_balancing: DnsLoadBalancingOptions?
?dns_load_balancing: DnsLoadBalancingOptions?,
?grpc_compression: GrpcCompressionOptions?
) -> void
end

Expand Down Expand Up @@ -92,6 +94,14 @@ module Temporalio
) -> void
end

class GrpcCompressionOptions
attr_accessor codec: String

def initialize: (
codec: String
) -> void
end

# Defined in Rust

SERVICE_WORKFLOW: Integer
Expand Down
15 changes: 15 additions & 0 deletions temporalio/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ def test_dns_load_balancing_default_interval
assert_in_delta 30.0, opts.resolution_interval
end

def test_grpc_compression_defaults_to_gzip
client = Temporalio::Client.connect('localhost:7233', 'default', lazy_connect: true)
assert_instance_of Temporalio::Client::Connection::GrpcCompressionOptions::Gzip,
client.connection.options.grpc_compression
assert_equal :gzip, client.connection.options.grpc_compression.codec
end

def test_grpc_compression_none_preserved
compression_opts = Temporalio::Client::Connection::GrpcCompressionOptions::None.new
client = Temporalio::Client.connect(
'localhost:7233', 'default', lazy_connect: true, grpc_compression: compression_opts
)
assert_equal compression_opts, client.connection.options.grpc_compression
end

class TrackCallsInterceptor
include Temporalio::Client::Interceptor

Expand Down
Loading