diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3ac7484b397..fb8e29f6747 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::error::Error; use std::fmt::{Debug, Display}; use std::future::Future; @@ -1922,7 +1922,12 @@ impl EnvelopeProcessorService { } /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay. - fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) { + fn send_global_partition( + &self, + upstream: Option, + partition_key: u32, + partition: &mut Partition<'_>, + ) { if partition.is_empty() { return; } @@ -1939,6 +1944,7 @@ impl EnvelopeProcessorService { }; let request = SendMetricsRequest { + upstream, partition_key: partition_key.to_string(), unencoded, encoded, @@ -1967,13 +1973,23 @@ impl EnvelopeProcessorService { } = message; let batch_size = self.inner.config.metrics_max_batch_size_bytes(); - let mut partition = Partition::new(batch_size); + let mut partitions = BTreeMap::new(); let mut partition_splits = 0; for ProjectBuckets { - buckets, scoping, .. + buckets, + scoping, + project_info, + .. } in buckets.values() { + let partition = match partitions.get_mut(&project_info.upstream) { + Some(partition) => partition, + None => partitions + .entry(project_info.upstream.clone()) + .or_insert_with(|| Partition::new(batch_size)), + }; + for bucket in buckets { let mut remaining = Some(BucketView::new(bucket)); @@ -1982,7 +1998,11 @@ impl EnvelopeProcessorService { // A part of the bucket could not be inserted. Take the partition and submit // it immediately. Repeat until the final part was inserted. This should // always result in a request, otherwise we would enter an endless loop. - self.send_global_partition(partition_key, &mut partition); + self.send_global_partition( + project_info.upstream.clone(), + partition_key, + partition, + ); remaining = Some(next); partition_splits += 1; } @@ -1994,7 +2014,9 @@ impl EnvelopeProcessorService { metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits); } - self.send_global_partition(partition_key, &mut partition); + for (upstream, mut partition) in partitions { + self.send_global_partition(upstream, partition_key, &mut partition); + } } async fn handle_flush_buckets(&self, mut message: FlushBuckets) { @@ -2283,8 +2305,7 @@ impl<'a> Partition<'a> { let buckets = &self.views; let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into(); - let scopings = self.project_info.clone(); - self.project_info.clear(); + let scopings = std::mem::take(&mut self.project_info); self.views.clear(); self.remaining = self.max_size; @@ -2298,6 +2319,8 @@ impl<'a> Partition<'a> { /// This request is not awaited. It automatically tracks outcomes if the request is not received. #[derive(Debug)] struct SendMetricsRequest { + /// Optional upstream override where the request will be sent to. + upstream: Option, /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`. partition_key: String, /// Serialized metric buckets without encoding applied, used for signing. @@ -2348,6 +2371,10 @@ impl SendMetricsRequest { } impl UpstreamRequest for SendMetricsRequest { + fn upstream(&self) -> Option<&UpstreamDescriptor> { + self.upstream.as_ref() + } + fn set_relay_id(&self) -> bool { true } diff --git a/tests/integration/fixtures/mini_proxy.py b/tests/integration/fixtures/mini_proxy.py index f91cdf7c191..38340067fbb 100644 --- a/tests/integration/fixtures/mini_proxy.py +++ b/tests/integration/fixtures/mini_proxy.py @@ -1,4 +1,4 @@ -from queue import Queue +from queue import Queue, Empty import threading import pytest from dataclasses import dataclass @@ -46,6 +46,13 @@ def url(self): def get_captured_request(self, *, timeout=None): return self.captured_requests.get(timeout=timeout or self.timeout) + def assert_empty(self): + try: + req = self.get_captured_request(timeout=0.2) + except Empty: + return + assert False, f"proxy has at least one request remaining: {req}" + @pytest.fixture def mini_proxy(request): diff --git a/tests/integration/test_advertised_upstream.py b/tests/integration/test_advertised_upstream.py index 48a018c0bae..2ee8f0f0c3f 100644 --- a/tests/integration/test_advertised_upstream.py +++ b/tests/integration/test_advertised_upstream.py @@ -2,6 +2,16 @@ import json +def some_metric_bucket(): + return { + "timestamp": int(datetime.now(UTC).timestamp()), + "name": "d:spans/measurements.lcp@millisecond", + "type": "d", + "value": [1.0], + "width": 1, + } + + def test_advertised_upstream_envelope(mini_sentry, mini_proxy, relay): project_id = 42 @@ -18,7 +28,7 @@ def test_advertised_upstream_envelope(mini_sentry, mini_proxy, relay): assert event is not None assert proxy.get_captured_request().path == "/api/42/envelope/" - assert proxy.captured_requests.empty() + proxy.assert_empty() def test_advertised_upstream_metrics(mini_sentry, mini_proxy, relay): @@ -31,24 +41,45 @@ def test_advertised_upstream_metrics(mini_sentry, mini_proxy, relay): relay = relay(mini_sentry) - bucket_name = "d:spans/measurements.lcp@millisecond" - bucket = { - "org_id": 1, - "project_id": project_id, - "timestamp": int(datetime.now(UTC).timestamp()), - "name": bucket_name, - "type": "d", - "value": [1.0], - "width": 1, - } - + bucket = some_metric_bucket() relay.send_metrics_buckets(project_id, [bucket]) envelope = mini_sentry.get_captured_envelope() payload = json.loads(envelope.items[0].payload.get_bytes())[0] - assert payload["name"] == bucket_name + assert payload["name"] == bucket["name"] request = proxy.get_captured_request() assert request.path == "/api/42/envelope/" assert "x-sentry-relay-shard" in request.headers - assert proxy.captured_requests.empty() + proxy.assert_empty() + + +def test_advertised_upstream_global_metrics(mini_sentry, mini_proxy, relay): + project_id1 = 42 + project_id2 = 43 + + proxy = mini_proxy(mini_sentry) + + project_config1 = mini_sentry.add_basic_project_config(project_id1) + project_config1["upstream"] = proxy.url + project_config2 = mini_sentry.add_basic_project_config(project_id2) + + relay = relay(mini_sentry, {"http": {"global_metrics": True}}) + + bucket = some_metric_bucket() + relay.send_metrics_buckets(project_id1, [bucket]) + relay.send_metrics_buckets(project_id2, [bucket]) + + project_keys = { + project_config1["publicKeys"][0]["publicKey"], + project_config2["publicKeys"][0]["publicKey"], + } + for _ in range(2): + metrics_batch = mini_sentry.captured_metrics.get(timeout=5) + assert metrics_batch.keys() <= project_keys + + request = proxy.get_captured_request() + assert request.path == "/api/0/relays/metrics/" + assert "x-sentry-relay-shard" in request.headers + # Only one project config has an upstream override through the proxy. + proxy.assert_empty() diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 815c68c8e86..a0535300d12 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -122,8 +122,6 @@ def test_metrics_proxy_mode_buckets(mini_sentry, relay): bucket_name = "d:spans/measurements.lcp@millisecond" bucket = { - "org_id": 1, - "project_id": project_id, "timestamp": int(datetime.now(UTC).timestamp()), "name": bucket_name, "type": "d",