Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ld-eventsource.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'webrick', '~> 1.7'

spec.add_runtime_dependency 'concurrent-ruby', '~> 1.0'
spec.add_runtime_dependency 'http', '>= 4.4.1', '< 6.0.0'
spec.add_runtime_dependency 'http', '>= 4.4.1', '< 7.0.0'
end
44 changes: 31 additions & 13 deletions lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class Client
# The default HTTP method for requests.
DEFAULT_HTTP_METHOD = "GET"

# TODO(breaking): Remove this filtering once we have updated to the next major breaking version.
# HTTP v6 requires keyword arguments instead of an options hash, so we filter to only known valid
# arguments to avoid passing unsupported options.
VALID_HTTP_CLIENT_OPTIONS = %i[
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsonbailey here is the approach we were discussing.

The only issue I found was that v4 and v5 support cookies as a parameter, but v6 dropped that. I don't include cookies in the list of allowable options since it would hard break in v6. However, I do log whenever we drop an option. Maybe that's a sufficient middle ground?

base_uri body encoding features follow form headers json keep_alive_timeout
nodelay params persistent proxy response retriable socket_class ssl_context
ssl ssl_socket_class timeout_class timeout_options
].freeze
Comment thread
cursor[bot] marked this conversation as resolved.

#
# Creates a new SSE client.
#
Expand Down Expand Up @@ -118,15 +127,15 @@ def initialize(uri,
@retry_enabled = retry_enabled

@headers = headers.clone
@connect_timeout = connect_timeout
@read_timeout = read_timeout
@connect_timeout = connect_timeout&.to_f
@read_timeout = read_timeout&.to_f
@method = method.to_s.upcase
@payload = payload
@logger = logger || default_logger

base_http_client_options = {}
if socket_factory
base_http_client_options["socket_class"] = socket_factory
base_http_client_options[:socket_class] = socket_factory
end

if proxy
Expand All @@ -139,22 +148,29 @@ def initialize(uri,
end

if @proxy
base_http_client_options["proxy"] = {
base_http_client_options[:proxy] = {
:proxy_address => @proxy.host,
:proxy_port => @proxy.port,
}
base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.nil?
base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.nil?
base_http_client_options[:proxy][:proxy_username] = @proxy.user unless @proxy.user.nil?
base_http_client_options[:proxy][:proxy_password] = @proxy.password unless @proxy.password.nil?
end

options = http_client_options.is_a?(Hash) ? base_http_client_options.merge(http_client_options) : base_http_client_options
options = options.transform_keys(&:to_sym)
options = options.select do |key, _|
included = VALID_HTTP_CLIENT_OPTIONS.include?(key)
@logger.warn { "Ignoring unsupported HTTP client option: #{key}" } unless included
included
end

timeout_options = {}
timeout_options[:connect_timeout] = @connect_timeout if @connect_timeout
timeout_options[:read_timeout] = @read_timeout if @read_timeout
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

@http_client = HTTP::Client.new(options)
@http_client = HTTP::Client.new(**options)
.follow
.timeout({
read: read_timeout,
connect: connect_timeout,
})
.timeout(timeout_options)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
@cxn = nil
@lock = Mutex.new

Expand Down Expand Up @@ -342,7 +358,7 @@ def connect
begin
uri = build_uri_with_query_params
@logger.info { "Connecting to event stream at #{uri}" }
cxn = @http_client.request(@method, uri, build_opts)
cxn = @http_client.request(@method, uri, **build_opts)
headers = cxn.headers
if cxn.status.code == 200
content_type = cxn.content_type.mime_type
Expand Down Expand Up @@ -390,8 +406,10 @@ def read_stream(cxn)
rescue HTTP::TimeoutError
# For historical reasons, we rethrow this as our own type
raise Errors::ReadTimeoutError.new(@read_timeout)
rescue EOFError
break
end
break if data.nil?
break if data.nil? # keep for v5 compat
gen.yield data
end
end
Expand Down
97 changes: 97 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,103 @@ def test_object.to_s
end
end

describe "http_client_options filtering" do
it "filters out unsupported options" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

client = subject.new(server.base_uri,
http_client_options: {
"socket_class" => "MySocket",
"ssl" => { verify_mode: 0 },
"not_a_real_option" => "should be removed",
"another_fake" => 123,
})

http_client = client.instance_variable_get(:@http_client)
options = http_client.default_options

expect(options.socket_class).to eq("MySocket")
expect(options.ssl).to eq({ verify_mode: 0 })

client.close
end
end

it "filters out unsupported options provided as symbols" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

client = subject.new(server.base_uri,
http_client_options: {
socket_class: "MySocket",
not_a_real_option: "should be removed",
})

http_client = client.instance_variable_get(:@http_client)
options = http_client.default_options

expect(options.socket_class).to eq("MySocket")

client.close
end
end

it "does not raise when only unsupported options are provided" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

client = nil
expect {
client = subject.new(server.base_uri,
http_client_options: {
"totally_fake" => true,
"also_fake" => "yes",
})
}.not_to raise_error

client.close
end
end

it "preserves all valid options" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

socket_factory = double("SocketFactory")
ssl_socket_factory = double("SSLSocketFactory")

client = subject.new(server.base_uri,
http_client_options: {
socket_class: socket_factory,
ssl_socket_class: ssl_socket_factory,
nodelay: true,
keep_alive_timeout: 30,
ssl: { verify_mode: 0 },
})

http_client = client.instance_variable_get(:@http_client)
options = http_client.default_options

expect(options.socket_class).to eq(socket_factory)
expect(options.ssl_socket_class).to eq(ssl_socket_factory)
expect(options.nodelay).to eq(true)
expect(options.keep_alive_timeout).to eq(30)
expect(options.ssl).to eq({ verify_mode: 0 })

client.close
end
end
end

describe "retry parameter" do
it "defaults to true (retries enabled)" do
events_body = simple_event_1_text
Expand Down
Loading