Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::error::Error;
use std::fmt::{Debug, Display};
use std::future::Future;
Expand Down Expand Up @@ -1922,7 +1922,12 @@ impl EnvelopeProcessorService {
}

/// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
fn send_global_partition(
&self,
upstream: Option<UpstreamDescriptor>,
partition_key: u32,
partition: &mut Partition<'_>,
) {
if partition.is_empty() {
return;
}
Expand All @@ -1939,6 +1944,7 @@ impl EnvelopeProcessorService {
};

let request = SendMetricsRequest {
upstream,
partition_key: partition_key.to_string(),
unencoded,
encoded,
Expand Down Expand Up @@ -1967,13 +1973,23 @@ impl EnvelopeProcessorService {
} = message;

let batch_size = self.inner.config.metrics_max_batch_size_bytes();
let mut partition = Partition::new(batch_size);
let mut partitions = BTreeMap::new();
let mut partition_splits = 0;

for ProjectBuckets {
buckets, scoping, ..
buckets,
scoping,
project_info,
..
} in buckets.values()
{
let partition = match partitions.get_mut(&project_info.upstream) {
Some(partition) => partition,
None => partitions
.entry(project_info.upstream.clone())
.or_insert_with(|| Partition::new(batch_size)),
};

for bucket in buckets {
let mut remaining = Some(BucketView::new(bucket));

Expand All @@ -1982,7 +1998,11 @@ impl EnvelopeProcessorService {
// A part of the bucket could not be inserted. Take the partition and submit
// it immediately. Repeat until the final part was inserted. This should
// always result in a request, otherwise we would enter an endless loop.
self.send_global_partition(partition_key, &mut partition);
self.send_global_partition(
project_info.upstream.clone(),
partition_key,
partition,
);
remaining = Some(next);
partition_splits += 1;
}
Expand All @@ -1994,7 +2014,9 @@ impl EnvelopeProcessorService {
metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
}

self.send_global_partition(partition_key, &mut partition);
for (upstream, mut partition) in partitions {
self.send_global_partition(upstream, partition_key, &mut partition);
}
}

async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
Expand Down Expand Up @@ -2283,8 +2305,7 @@ impl<'a> Partition<'a> {
let buckets = &self.views;
let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();

let scopings = self.project_info.clone();
self.project_info.clear();
let scopings = std::mem::take(&mut self.project_info);

self.views.clear();
self.remaining = self.max_size;
Expand All @@ -2298,6 +2319,8 @@ impl<'a> Partition<'a> {
/// This request is not awaited. It automatically tracks outcomes if the request is not received.
#[derive(Debug)]
struct SendMetricsRequest {
/// Optional upstream override where the request will be sent to.
upstream: Option<UpstreamDescriptor>,
/// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
partition_key: String,
/// Serialized metric buckets without encoding applied, used for signing.
Expand Down Expand Up @@ -2348,6 +2371,10 @@ impl SendMetricsRequest {
}

impl UpstreamRequest for SendMetricsRequest {
fn upstream(&self) -> Option<&UpstreamDescriptor> {
self.upstream.as_ref()
}

fn set_relay_id(&self) -> bool {
true
}
Expand Down
9 changes: 8 additions & 1 deletion tests/integration/fixtures/mini_proxy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from queue import Queue
from queue import Queue, Empty
import threading
import pytest
from dataclasses import dataclass
Expand Down Expand Up @@ -46,6 +46,13 @@ def url(self):
def get_captured_request(self, *, timeout=None):
return self.captured_requests.get(timeout=timeout or self.timeout)

def assert_empty(self):
try:
req = self.get_captured_request(timeout=0.2)
except Empty:
return
assert False, f"proxy has at least one request remaining: {req}"


@pytest.fixture
def mini_proxy(request):
Expand Down
59 changes: 45 additions & 14 deletions tests/integration/test_advertised_upstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
import json


def some_metric_bucket():
return {
"timestamp": int(datetime.now(UTC).timestamp()),
"name": "d:spans/measurements.lcp@millisecond",
"type": "d",
"value": [1.0],
"width": 1,
}


def test_advertised_upstream_envelope(mini_sentry, mini_proxy, relay):
project_id = 42

Expand All @@ -18,7 +28,7 @@ def test_advertised_upstream_envelope(mini_sentry, mini_proxy, relay):
assert event is not None

assert proxy.get_captured_request().path == "/api/42/envelope/"
assert proxy.captured_requests.empty()
proxy.assert_empty()


def test_advertised_upstream_metrics(mini_sentry, mini_proxy, relay):
Expand All @@ -31,24 +41,45 @@ def test_advertised_upstream_metrics(mini_sentry, mini_proxy, relay):

relay = relay(mini_sentry)

bucket_name = "d:spans/measurements.lcp@millisecond"
bucket = {
"org_id": 1,
"project_id": project_id,
"timestamp": int(datetime.now(UTC).timestamp()),
"name": bucket_name,
"type": "d",
"value": [1.0],
"width": 1,
}

bucket = some_metric_bucket()
relay.send_metrics_buckets(project_id, [bucket])

envelope = mini_sentry.get_captured_envelope()
payload = json.loads(envelope.items[0].payload.get_bytes())[0]
assert payload["name"] == bucket_name
assert payload["name"] == bucket["name"]

request = proxy.get_captured_request()
assert request.path == "/api/42/envelope/"
assert "x-sentry-relay-shard" in request.headers
assert proxy.captured_requests.empty()
proxy.assert_empty()


def test_advertised_upstream_global_metrics(mini_sentry, mini_proxy, relay):
project_id1 = 42
project_id2 = 43

proxy = mini_proxy(mini_sentry)

project_config1 = mini_sentry.add_basic_project_config(project_id1)
project_config1["upstream"] = proxy.url
project_config2 = mini_sentry.add_basic_project_config(project_id2)

relay = relay(mini_sentry, {"http": {"global_metrics": True}})

bucket = some_metric_bucket()
relay.send_metrics_buckets(project_id1, [bucket])
relay.send_metrics_buckets(project_id2, [bucket])

project_keys = {
project_config1["publicKeys"][0]["publicKey"],
project_config2["publicKeys"][0]["publicKey"],
}
for _ in range(2):
metrics_batch = mini_sentry.captured_metrics.get(timeout=5)
assert metrics_batch.keys() <= project_keys

request = proxy.get_captured_request()
assert request.path == "/api/0/relays/metrics/"
assert "x-sentry-relay-shard" in request.headers
# Only one project config has an upstream override through the proxy.
proxy.assert_empty()
2 changes: 0 additions & 2 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ def test_metrics_proxy_mode_buckets(mini_sentry, relay):
bucket_name = "d:spans/measurements.lcp@millisecond"

bucket = {
"org_id": 1,
"project_id": project_id,
"timestamp": int(datetime.now(UTC).timestamp()),
"name": bucket_name,
"type": "d",
Expand Down
Loading