diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d75f0e2de7..0012227642e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ **Internal**: - Bump `sentry-conventions` to 0.6.0-4. ([#5944](https://github.com/getsentry/relay/pull/5944)) +- Enable compression for forwarded uploads. ([#5965](https://github.com/getsentry/relay/pull/5965)) ## 26.4.2 diff --git a/Cargo.lock b/Cargo.lock index 2adc33800e1..c849b59921b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4912,6 +4912,7 @@ dependencies = [ "ahash", "anyhow", "arc-swap", + "async-compression", "axum", "axum-extra", "axum-server", diff --git a/Cargo.toml b/Cargo.toml index 957383369c4..bbb9505f4bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" +async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 13cbe16e25c..54b1e19f6b6 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -32,6 +32,7 @@ workspace = true ahash = { workspace = true } anyhow = { workspace = true } arc-swap = { workspace = true } +async-compression = { workspace = true } axum = { workspace = true, features = ["macros", "matched-path", "tracing"] } axum-extra = { workspace = true } axum-server = { workspace = true } diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 78788bbbcfd..f678ff51fbd 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -4,9 +4,11 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; +use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder, ZstdEncoder}; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; +use futures::StreamExt; use futures::stream::BoxStream; use http::{HeaderValue, Method}; use relay_auth::Signature; @@ -14,14 +16,17 @@ use relay_auth::Signature; use relay_auth::SignatureHeader; use relay_base_schema::project::ProjectId; use relay_config::Config; +use relay_config::HttpEncoding; use relay_quotas::Scoping; use relay_system::{ Addr, AsyncResponse, ConcurrentService, FromMessage, Interface, LoadShed, SendError, Sender, SimpleService, }; use serde::Deserialize; +use tokio::io::BufReader; use tokio::sync::oneshot; use tokio::sync::oneshot::error::RecvError; +use tokio_util::io::{ReaderStream, StreamReader}; #[cfg(feature = "processing")] use uuid::Uuid; @@ -458,6 +463,7 @@ enum RequestKind { Upload { location: SignedLocation, stream: Option>>, + encoding: HttpEncoding, }, } @@ -508,6 +514,7 @@ impl UploadRequest { kind: RequestKind::Upload { location, stream: Some(stream), + encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() }, sender, }, @@ -579,12 +586,19 @@ impl UpstreamRequest for UploadRequest { fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> { let upload_length = self.length(); - if let RequestKind::Upload { stream, .. } = &mut self.kind { + if let RequestKind::Upload { + stream, encoding, .. + } = &mut self.kind + { let Some(body) = stream.take() else { relay_log::error!("upload request was retried or never initialized"); return Err(HttpError::Misconfigured); }; tus::add_upload_headers(builder); + + let body = encode_body(body, *encoding); + builder.content_encoding(*encoding); + builder.body(reqwest::Body::wrap_stream(body)); } else { tus::add_creation_headers(upload_length, builder); @@ -596,4 +610,24 @@ impl UpstreamRequest for UploadRequest { Ok(()) } + + fn configure(&mut self, config: &Config) { + if let RequestKind::Upload { encoding, .. } = &mut self.kind { + *encoding = config.http_encoding(); + } + } +} + +fn encode_body(stream: S, encoding: HttpEncoding) -> ByteStream +where + S: futures::Stream> + Send + 'static, +{ + let reader = BufReader::new(StreamReader::new(stream)); + match encoding { + HttpEncoding::Identity => ReaderStream::new(reader).boxed(), + HttpEncoding::Deflate => ReaderStream::new(DeflateEncoder::new(reader)).boxed(), + HttpEncoding::Gzip => ReaderStream::new(GzipEncoder::new(reader)).boxed(), + HttpEncoding::Br => ReaderStream::new(BrotliEncoder::new(reader)).boxed(), + HttpEncoding::Zstd => ReaderStream::new(ZstdEncoder::new(reader)).boxed(), + } } diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a9d51be15c7..2855ccd073b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -5,7 +5,7 @@ from typing import Optional import json import redis -from flask import Response +from flask import Response, request import pytest @@ -47,7 +47,7 @@ objectstore, ) -from .consts import DUMMY_UPLOAD_LOCATION +from .consts import DUMMY_UPLOAD_LOCATION, ZSTD_MAGIC_HEADER @pytest.fixture(scope="session") @@ -314,6 +314,8 @@ def create(**opts): @mini_sentry.app.route("/api//upload//", methods=["PATCH"]) def upload(**opts): + assert request.headers["Content-Encoding"] == "zstd" + assert request.data.startswith(ZSTD_MAGIC_HEADER) return Response( "", status=204, diff --git a/tests/integration/consts.py b/tests/integration/consts.py index 0359eb0c602..5846cbb5081 100644 --- a/tests/integration/consts.py +++ b/tests/integration/consts.py @@ -3,3 +3,5 @@ DUMMY_UPLOAD_PATH = "/api/42/upload/019cdc82ed6c7761ba21fd34b86481c2/" DUMMY_UPLOAD_LOCATION = f"{DUMMY_UPLOAD_PATH}?length=11&signature=z_fUMhT0EZqJz6OQtwGHqTlOOLPpTVpvPa-rYTg18FVWZM1OGny-LeVJB5H-sSR_5e--I1xt-FlCmRG2bsmcAQ.eyJ0IjoiMjAyNi0wMy0xMVQxMDo0ODoxMy45NDM1ODNaIn0" + +ZSTD_MAGIC_HEADER = b"\x28\xb5\x2f\xfd"