Skip to content

Commit 8920a0a

Browse files
Merge pull request #5 from BetterStackHQ/alistair/handle-rst-stream-cancel
Handle hyper providing us incomplete requests
2 parents de0cdf9 + 08cee60 commit 8920a0a

2 files changed

Lines changed: 144 additions & 4 deletions

File tree

ext/hyper_ruby/src/lib.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crossbeam_channel;
2727

2828
use hyper::service::service_fn;
2929
use hyper::{Error, Request as HyperRequest, Response as HyperResponse, StatusCode};
30-
use hyper::body::Incoming;
30+
use hyper::body::{Body, Incoming};
3131
use hyper_util::rt::TokioIo;
3232
use hyper_util::server::conn::auto;
3333
use http_body_util::BodyExt;
@@ -494,7 +494,17 @@ async fn handle_request(
494494
debug!("Headers: {:?}", req.headers());
495495

496496
let (parts, body) = req.into_parts();
497-
497+
498+
// Capture the declared body length before consuming the body. Hyper's
499+
// `Incoming::size_hint().exact()` mirrors the inbound Content-Length
500+
// (populated from the header for H2 as well as H1), and is the only
501+
// signal we have here: hyper converts `RST_STREAM(NO_ERROR|CANCEL)` —
502+
// the codes browsers send on navigation cancellation — into a clean
503+
// end-of-body rather than surfacing the reset (see
504+
// hyper-1.6.0/src/body/incoming.rs:~249). We therefore validate the
505+
// declared length against what we actually collected.
506+
let declared_len = body.size_hint().exact();
507+
498508
// Collect the body with timeout
499509
let body_bytes = match timeout(
500510
std::time::Duration::from_millis(recv_timeout),
@@ -510,9 +520,19 @@ async fn handle_request(
510520
return Ok(create_timeout_response());
511521
}
512522
};
513-
523+
514524
debug!("Collected body size: {}", body_bytes.len());
515525

526+
if let Some(declared) = declared_len {
527+
if declared != body_bytes.len() as u64 {
528+
warn!(
529+
"Body truncated: declared {} bytes, received {} — likely RST_STREAM(CANCEL); rejecting request",
530+
declared, body_bytes.len()
531+
);
532+
return Ok(create_bad_request_response("Body length does not match Content-Length"));
533+
}
534+
}
535+
516536
let hyper_request = HyperRequest::from_parts(parts, body_bytes);
517537
let is_grpc = grpc::is_grpc_request(&hyper_request);
518538
debug!("Is gRPC: {}", is_grpc);
@@ -628,11 +648,19 @@ fn create_too_many_requests_response(error_message: &str) -> HyperResponse<BodyW
628648
let builder = HyperResponse::builder()
629649
.status(StatusCode::TOO_MANY_REQUESTS)
630650
.header("content-type", "text/plain");
631-
651+
632652
builder.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None))
633653
.unwrap()
634654
}
635655

656+
fn create_bad_request_response(error_message: &str) -> HyperResponse<BodyWithTrailers> {
657+
HyperResponse::builder()
658+
.status(StatusCode::BAD_REQUEST)
659+
.header("content-type", "text/plain")
660+
.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None))
661+
.unwrap()
662+
}
663+
636664
#[magnus::init]
637665
fn init(ruby: &Ruby) -> Result<(), MagnusError> {
638666
let module = ruby.define_module("HyperRuby")?;

test/test_h2_stream_reset.rb

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
require "httpx" # with_configured_server builds an HTTPX client (unused here)
5+
require "socket"
6+
require "timeout"
7+
8+
# Regression test for a silent-truncation bug: when an HTTP/2 peer sends
9+
# HEADERS + a partial DATA frame + RST_STREAM (the frame sequence a browser
10+
# produces when it cancels an in-flight request on page navigation), the
11+
# server currently hands the truncated body to the Ruby handler as if the
12+
# request had completed normally. Downstream consumers (Kafka, etc.) then
13+
# see a short body alongside the original Content-Length header, producing
14+
# confusingly "truncated" payloads.
15+
#
16+
# Correct behaviour: a stream that ends via RST_STREAM (not END_STREAM) is
17+
# not a completed request; the handler should not run.
18+
class TestH2StreamReset < HyperRubyTest
19+
PORT = 3010
20+
PARTIAL_BYTES = 16_384 # default HTTP/2 SETTINGS_MAX_FRAME_SIZE
21+
CLAIMED_CONTENT_LENGTH = 450_403
22+
23+
def test_rst_stream_after_partial_data_does_not_invoke_handler
24+
invocations = []
25+
mutex = Mutex.new
26+
27+
handler = lambda do |request|
28+
mutex.synchronize do
29+
invocations << {
30+
path: request.path,
31+
body_size: request.body_size,
32+
content_length: request.header("content-length"),
33+
}
34+
end
35+
HyperRuby::Response.new(200, { "Content-Type" => "text/plain" }, "ok")
36+
end
37+
38+
config = { bind_address: "127.0.0.1:#{PORT}", tokio_threads: 1, recv_timeout: 1_000 }
39+
40+
with_configured_server(config, handler) do
41+
send_h2_headers_data_rst(
42+
host: "127.0.0.1",
43+
port: PORT,
44+
partial_bytes: PARTIAL_BYTES,
45+
claimed_content_length: CLAIMED_CONTENT_LENGTH,
46+
)
47+
48+
# Give the server time to hand off to the worker if it's going to.
49+
sleep 0.2
50+
end
51+
52+
mutex.synchronize do
53+
assert_empty invocations,
54+
"handler should not have been invoked for a RST_STREAM'd request, but it ran with: #{invocations.inspect}"
55+
end
56+
end
57+
58+
private
59+
60+
H2_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".b
61+
TYPE_DATA = 0x0
62+
TYPE_HEADERS = 0x1
63+
TYPE_RST_STREAM = 0x3
64+
TYPE_SETTINGS = 0x4
65+
FLAG_END_HEADERS = 0x4
66+
FLAG_ACK = 0x1
67+
RST_CODE_CANCEL = 0x8 # what Chromium sends on navigation
68+
69+
def h2_frame(type, flags, stream_id, payload)
70+
len = payload.bytesize
71+
[len >> 16 & 0xff, len >> 8 & 0xff, len & 0xff,
72+
type, flags, stream_id & 0x7fffffff].pack("CCCCCN") + payload
73+
end
74+
75+
# Minimal HPACK: "literal header field, never indexed, new name" with
76+
# 7-bit string lengths. Names and values must fit in 126 bytes — fine for
77+
# everything we send here.
78+
def hpack_literal(name, value)
79+
raise "name too long" if name.bytesize > 126
80+
raise "value too long" if value.bytesize > 126
81+
[0x10, name.bytesize].pack("CC") + name.b +
82+
[value.bytesize].pack("C") + value.b
83+
end
84+
85+
def send_h2_headers_data_rst(host:, port:, partial_bytes:, claimed_content_length:)
86+
sock = TCPSocket.new(host, port)
87+
sock.write(H2_PREFACE)
88+
sock.write(h2_frame(TYPE_SETTINGS, 0, 0, "".b))
89+
sock.write(h2_frame(TYPE_SETTINGS, FLAG_ACK, 0, "".b))
90+
91+
headers = "".b
92+
headers << hpack_literal(":method", "POST")
93+
headers << hpack_literal(":scheme", "http")
94+
headers << hpack_literal(":path", "/rst-truncated")
95+
headers << hpack_literal(":authority", "#{host}:#{port}")
96+
headers << hpack_literal("content-type", "application/octet-stream")
97+
headers << hpack_literal("content-length", claimed_content_length.to_s)
98+
99+
sock.write(h2_frame(TYPE_HEADERS, FLAG_END_HEADERS, 1, headers))
100+
sock.write(h2_frame(TYPE_DATA, 0, 1, "X".b * partial_bytes))
101+
sock.write(h2_frame(TYPE_RST_STREAM, 0, 1, [RST_CODE_CANCEL].pack("N")))
102+
103+
# Drain anything the server may have written before we hung up; we don't
104+
# care what it is.
105+
begin
106+
Timeout.timeout(0.5) { sock.read(4096) }
107+
rescue Timeout::Error
108+
end
109+
ensure
110+
sock&.close
111+
end
112+
end

0 commit comments

Comments
 (0)