From 2902ebf2b0991644908907c0f576c5ec6420ffe2 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 8 May 2026 13:59:18 +0200 Subject: [PATCH 1/7] feat(upload): Enable gzip encoding for forwarded uplaods --- Cargo.lock | 1 + Cargo.toml | 1 + relay-server/Cargo.toml | 1 + relay-server/src/services/upload.rs | 9 +++++++++ 4 files changed, 12 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 2adc33800e1..c849b59921b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4912,6 +4912,7 @@ dependencies = [ "ahash", "anyhow", "arc-swap", + "async-compression", "axum", "axum-extra", "axum-server", diff --git a/Cargo.toml b/Cargo.toml index 957383369c4..78b5632eeca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" +async-compression = { version = "0.4", features = ["tokio", "gzip"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 13cbe16e25c..54b1e19f6b6 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -32,6 +32,7 @@ workspace = true ahash = { workspace = true } anyhow = { workspace = true } arc-swap = { workspace = true } +async-compression = { workspace = true } axum = { workspace = true, features = ["macros", "matched-path", "tracing"] } axum-extra = { workspace = true } axum-server = { workspace = true } diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 78788bbbcfd..4abac43ea06 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -4,6 +4,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; +use async_compression::tokio::bufread::GzipEncoder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -20,8 +21,11 @@ use relay_system::{ SimpleService, }; use serde::Deserialize; +use tokio::io::BufReader; use tokio::sync::oneshot; use tokio::sync::oneshot::error::RecvError; +use tokio_util::io::ReaderStream; +use tokio_util::io::StreamReader; #[cfg(feature = "processing")] use uuid::Uuid; @@ -585,6 +589,11 @@ impl UpstreamRequest for UploadRequest { return Err(HttpError::Misconfigured); }; tus::add_upload_headers(builder); + + // TODO: chose based on config.http.encoding instead + let body = ReaderStream::new(GzipEncoder::new(BufReader::new(StreamReader::new(body)))); + builder.content_encoding(relay_config::HttpEncoding::Gzip); + builder.body(reqwest::Body::wrap_stream(body)); } else { tus::add_creation_headers(upload_length, builder); From c00e517793546218d5ee9f74929a24ff7bd836c0 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 8 May 2026 19:45:45 +0200 Subject: [PATCH 2/7] feat(upload): Use configured HTTP encoding for forwarded uploads Replace the hardcoded gzip encoder in `UploadRequest::build` with a dispatch over `config.http_encoding()`, so forwarded uploads can use deflate/brotli/zstd in addition to gzip. Fixes the `encode_body` helper to compile (proper stream bounds, boxed return type) and corrects a ZstdDecoder/Encoder typo. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.toml | 2 +- relay-server/src/services/upload.rs | 37 +++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 78b5632eeca..bbb9505f4bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" -async-compression = { version = "0.4", features = ["tokio", "gzip"] } +async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 4abac43ea06..e94779c5636 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -4,10 +4,14 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; +use async_compression::tokio::bufread::BrotliEncoder; +use async_compression::tokio::bufread::DeflateEncoder; use async_compression::tokio::bufread::GzipEncoder; +use async_compression::tokio::bufread::ZstdEncoder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; +use futures::StreamExt; use futures::stream::BoxStream; use http::{HeaderValue, Method}; use relay_auth::Signature; @@ -15,6 +19,7 @@ use relay_auth::Signature; use relay_auth::SignatureHeader; use relay_base_schema::project::ProjectId; use relay_config::Config; +use relay_config::HttpEncoding; use relay_quotas::Scoping; use relay_system::{ Addr, AsyncResponse, ConcurrentService, FromMessage, Interface, LoadShed, SendError, Sender, @@ -462,6 +467,7 @@ enum RequestKind { Upload { location: SignedLocation, stream: Option>>, + encoding: HttpEncoding, }, } @@ -512,6 +518,7 @@ impl UploadRequest { kind: RequestKind::Upload { location, stream: Some(stream), + encoding: HttpEncoding::Zstd, }, sender, }, @@ -583,16 +590,18 @@ impl UpstreamRequest for UploadRequest { fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> { let upload_length = self.length(); - if let RequestKind::Upload { stream, .. } = &mut self.kind { + if let RequestKind::Upload { + stream, encoding, .. + } = &mut self.kind + { let Some(body) = stream.take() else { relay_log::error!("upload request was retried or never initialized"); return Err(HttpError::Misconfigured); }; tus::add_upload_headers(builder); - // TODO: chose based on config.http.encoding instead - let body = ReaderStream::new(GzipEncoder::new(BufReader::new(StreamReader::new(body)))); - builder.content_encoding(relay_config::HttpEncoding::Gzip); + let body = encode_body(body, *encoding); + builder.content_encoding(*encoding); builder.body(reqwest::Body::wrap_stream(body)); } else { @@ -605,4 +614,24 @@ impl UpstreamRequest for UploadRequest { Ok(()) } + + fn configure(&mut self, config: &Config) { + if let RequestKind::Upload { encoding, .. } = &mut self.kind { + *encoding = config.http_encoding(); + } + } +} + +fn encode_body(stream: S, encoding: HttpEncoding) -> ByteStream +where + S: futures::Stream> + Send + 'static, +{ + let reader = BufReader::new(StreamReader::new(stream)); + match encoding { + HttpEncoding::Identity => ReaderStream::new(reader).boxed(), + HttpEncoding::Deflate => ReaderStream::new(DeflateEncoder::new(reader)).boxed(), + HttpEncoding::Gzip => ReaderStream::new(GzipEncoder::new(reader)).boxed(), + HttpEncoding::Br => ReaderStream::new(BrotliEncoder::new(reader)).boxed(), + HttpEncoding::Zstd => ReaderStream::new(ZstdEncoder::new(reader)).boxed(), + } } From f78c3a68bbfb738109c7e5064a369059f35d0c23 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 8 May 2026 20:08:23 +0200 Subject: [PATCH 3/7] test to repro --- relay-server/src/services/upload.rs | 2 +- tests/integration/test_upload.py | 74 +++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index e94779c5636..d4eeeaf47d6 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -600,7 +600,7 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let body = encode_body(body, *encoding); + let body = encode_body(body, dbg!(*encoding)); builder.content_encoding(*encoding); builder.body(reqwest::Body::wrap_stream(body)); diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 413ef2b7829..74b0c4f7064 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -2,6 +2,8 @@ Tests for the TUS upload endpoint (/api/{project_id}/upload/). """ +import gzip +import io import time import uuid from concurrent.futures import ThreadPoolExecutor, as_completed @@ -604,3 +606,75 @@ def upload_something(relay, project_id, project_key): }, data=data, ) + + +def test_large_gzip_upload_latency( + mini_sentry, + relay, + relay_with_processing, + project_config, + objectstore, + events_consumer, +): + """Measure response latency after the last data frame for a 200 MB gzip-encoded upload. + + Sends through the full chain (outer relay -> processing relay -> objectstore) a body + consisting of 200 MB of zeros encoded with Content-Encoding: gzip. + """ + mini_sentry.allow_chunked = True + project_id = 42 + project_key = mini_sentry.get_dsn_public_key(project_id) + + processing_relay = relay_with_processing() + relay = relay(processing_relay) + + # Wait for project config propagation through the chain. + events_consumer = events_consumer() + relay.send_event(project_id) + events_consumer.get_event() + + raw_size = 200 * 1024 * 1024 + buf = io.BytesIO() + with gzip.GzipFile(fileobj=buf, mode="wb") as f: + f.write(b"\x00" * raw_size) + gzipped = buf.getvalue() + + response = relay.post( + f"/api/{project_id}/upload/?sentry_key={project_key}", + headers={ + "Content-Length": "0", + "Tus-Resumable": "1.0.0", + "Upload-Length": str(raw_size), + }, + ) + assert response.status_code == 201, response.text + location = response.headers["Location"] + + last_frame_time = None + + def body(): + nonlocal last_frame_time + chunk_size = 1024 + for i in range(0, len(gzipped), chunk_size): + chunk = gzipped[i : i + chunk_size] + yield chunk + print("DONE SENDING DATA") + last_frame_time = time.monotonic() + + response = relay.patch( + f"{location}&sentry_key={project_key}", + headers={ + "Content-Type": "application/offset+octet-stream", + "Content-Encoding": "gzip", + "Tus-Resumable": "1.0.0", + "Upload-Offset": "0", + }, + data=body(), + timeout=120, + ) + response_time = time.monotonic() + + assert response.status_code == 204, response.text + assert last_frame_time is not None + elapsed = response_time - last_frame_time + print(f"Time between last data frame and response: {elapsed:.3f}s") From e27eb45cfa6da036703105ae81d1e72be9cd1c92 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 8 May 2026 20:24:06 +0200 Subject: [PATCH 4/7] Revert "test to repro" This reverts commit f78c3a68bbfb738109c7e5064a369059f35d0c23. --- relay-server/src/services/upload.rs | 2 +- tests/integration/test_upload.py | 74 ----------------------------- 2 files changed, 1 insertion(+), 75 deletions(-) diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index d4eeeaf47d6..e94779c5636 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -600,7 +600,7 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let body = encode_body(body, dbg!(*encoding)); + let body = encode_body(body, *encoding); builder.content_encoding(*encoding); builder.body(reqwest::Body::wrap_stream(body)); diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 74b0c4f7064..413ef2b7829 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -2,8 +2,6 @@ Tests for the TUS upload endpoint (/api/{project_id}/upload/). """ -import gzip -import io import time import uuid from concurrent.futures import ThreadPoolExecutor, as_completed @@ -606,75 +604,3 @@ def upload_something(relay, project_id, project_key): }, data=data, ) - - -def test_large_gzip_upload_latency( - mini_sentry, - relay, - relay_with_processing, - project_config, - objectstore, - events_consumer, -): - """Measure response latency after the last data frame for a 200 MB gzip-encoded upload. - - Sends through the full chain (outer relay -> processing relay -> objectstore) a body - consisting of 200 MB of zeros encoded with Content-Encoding: gzip. - """ - mini_sentry.allow_chunked = True - project_id = 42 - project_key = mini_sentry.get_dsn_public_key(project_id) - - processing_relay = relay_with_processing() - relay = relay(processing_relay) - - # Wait for project config propagation through the chain. - events_consumer = events_consumer() - relay.send_event(project_id) - events_consumer.get_event() - - raw_size = 200 * 1024 * 1024 - buf = io.BytesIO() - with gzip.GzipFile(fileobj=buf, mode="wb") as f: - f.write(b"\x00" * raw_size) - gzipped = buf.getvalue() - - response = relay.post( - f"/api/{project_id}/upload/?sentry_key={project_key}", - headers={ - "Content-Length": "0", - "Tus-Resumable": "1.0.0", - "Upload-Length": str(raw_size), - }, - ) - assert response.status_code == 201, response.text - location = response.headers["Location"] - - last_frame_time = None - - def body(): - nonlocal last_frame_time - chunk_size = 1024 - for i in range(0, len(gzipped), chunk_size): - chunk = gzipped[i : i + chunk_size] - yield chunk - print("DONE SENDING DATA") - last_frame_time = time.monotonic() - - response = relay.patch( - f"{location}&sentry_key={project_key}", - headers={ - "Content-Type": "application/offset+octet-stream", - "Content-Encoding": "gzip", - "Tus-Resumable": "1.0.0", - "Upload-Offset": "0", - }, - data=body(), - timeout=120, - ) - response_time = time.monotonic() - - assert response.status_code == 204, response.text - assert last_frame_time is not None - elapsed = response_time - last_frame_time - print(f"Time between last data frame and response: {elapsed:.3f}s") From a6cda14598d1a8257231ae1da16f33c5c1d2fc2d Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 8 May 2026 20:49:36 +0200 Subject: [PATCH 5/7] assertions --- tests/integration/conftest.py | 6 ++++-- tests/integration/consts.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a9d51be15c7..2855ccd073b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -5,7 +5,7 @@ from typing import Optional import json import redis -from flask import Response +from flask import Response, request import pytest @@ -47,7 +47,7 @@ objectstore, ) -from .consts import DUMMY_UPLOAD_LOCATION +from .consts import DUMMY_UPLOAD_LOCATION, ZSTD_MAGIC_HEADER @pytest.fixture(scope="session") @@ -314,6 +314,8 @@ def create(**opts): @mini_sentry.app.route("/api//upload//", methods=["PATCH"]) def upload(**opts): + assert request.headers["Content-Encoding"] == "zstd" + assert request.data.startswith(ZSTD_MAGIC_HEADER) return Response( "", status=204, diff --git a/tests/integration/consts.py b/tests/integration/consts.py index 0359eb0c602..5846cbb5081 100644 --- a/tests/integration/consts.py +++ b/tests/integration/consts.py @@ -3,3 +3,5 @@ DUMMY_UPLOAD_PATH = "/api/42/upload/019cdc82ed6c7761ba21fd34b86481c2/" DUMMY_UPLOAD_LOCATION = f"{DUMMY_UPLOAD_PATH}?length=11&signature=z_fUMhT0EZqJz6OQtwGHqTlOOLPpTVpvPa-rYTg18FVWZM1OGny-LeVJB5H-sSR_5e--I1xt-FlCmRG2bsmcAQ.eyJ0IjoiMjAyNi0wMy0xMVQxMDo0ODoxMy45NDM1ODNaIn0" + +ZSTD_MAGIC_HEADER = b"\x28\xb5\x2f\xfd" From 1c9774c96c0889902eb29a6b4970c24426746b57 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 8 May 2026 20:59:46 +0200 Subject: [PATCH 6/7] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d75f0e2de7..0012227642e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ **Internal**: - Bump `sentry-conventions` to 0.6.0-4. ([#5944](https://github.com/getsentry/relay/pull/5944)) +- Enable compression for forwarded uploads. ([#5965](https://github.com/getsentry/relay/pull/5965)) ## 26.4.2 From c05e0750626cce488efdff31fe7d466b46427b1d Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 11 May 2026 09:00:16 +0200 Subject: [PATCH 7/7] Apply suggestions from code review Co-authored-by: Sebastian Zivota Co-authored-by: Joris Bayer --- relay-server/src/services/upload.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index e94779c5636..f678ff51fbd 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -4,10 +4,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use async_compression::tokio::bufread::BrotliEncoder; -use async_compression::tokio::bufread::DeflateEncoder; -use async_compression::tokio::bufread::GzipEncoder; -use async_compression::tokio::bufread::ZstdEncoder; +use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder, ZstdEncoder}; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -29,8 +26,7 @@ use serde::Deserialize; use tokio::io::BufReader; use tokio::sync::oneshot; use tokio::sync::oneshot::error::RecvError; -use tokio_util::io::ReaderStream; -use tokio_util::io::StreamReader; +use tokio_util::io::{ReaderStream, StreamReader}; #[cfg(feature = "processing")] use uuid::Uuid; @@ -518,7 +514,7 @@ impl UploadRequest { kind: RequestKind::Upload { location, stream: Some(stream), - encoding: HttpEncoding::Zstd, + encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() }, sender, },