Skip to content

Commit 2243832

Browse files
committed
Rework like in js
1 parent 0c165af commit 2243832

6 files changed

Lines changed: 92 additions & 60 deletions

File tree

sentry_sdk/_span_batcher.py

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
if TYPE_CHECKING:
1515
from typing import Any, Callable, Optional
16-
from sentry_sdk.traces import StreamedSpan
16+
from sentry_sdk._types import SpanJSON
1717

1818

19-
class SpanBatcher(Batcher["StreamedSpan"]):
19+
class SpanBatcher(Batcher["SpanJSON"]):
2020
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
2121
# a bit of a buffer for spans that appear between the trigger to flush
2222
# and actually flushing the buffer.
@@ -42,7 +42,7 @@ def __init__(
4242
# by trace_id, so that we can then send the buckets each in its own
4343
# envelope.
4444
# trace_id -> span buffer
45-
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
45+
self._span_buffer: dict[str, list["SpanJSON"]] = defaultdict(list)
4646
self._running_size: dict[str, int] = defaultdict(lambda: 0)
4747
self._capture_func = capture_func
4848
self._record_lost_func = record_lost_func
@@ -99,7 +99,7 @@ def _flush_loop(self) -> None:
9999
self._flush()
100100
self._last_full_flush = time.monotonic()
101101

102-
def add(self, span: "StreamedSpan") -> None:
102+
def add(self, span: "SpanJSON") -> None:
103103
# Bail out if the current thread is already executing batcher code.
104104
# This prevents deadlocks when code running inside the batcher (e.g.
105105
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
@@ -115,7 +115,7 @@ def add(self, span: "StreamedSpan") -> None:
115115
return None
116116

117117
with self._lock:
118-
size = len(self._span_buffer[span.trace_id])
118+
size = len(self._span_buffer[span["trace_id"]])
119119
if size >= self.MAX_BEFORE_DROP:
120120
self._record_lost_func(
121121
reason="queue_overflow",
@@ -124,14 +124,15 @@ def add(self, span: "StreamedSpan") -> None:
124124
)
125125
return None
126126

127-
self._span_buffer[span.trace_id].append(span)
128-
self._running_size[span.trace_id] += self._estimate_size(span)
127+
self._span_buffer[span["trace_id"]].append(span)
128+
self._running_size[span["trace_id"]] += self._estimate_size(span)
129129

130130
if (
131131
size + 1 >= self.MAX_BEFORE_FLUSH
132-
or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH
132+
or self._running_size[span["trace_id"]]
133+
>= self.MAX_BYTES_BEFORE_FLUSH
133134
):
134-
self._pending_flush.add(span.trace_id)
135+
self._pending_flush.add(span["trace_id"])
135136
notify = True
136137
else:
137138
notify = False
@@ -142,12 +143,12 @@ def add(self, span: "StreamedSpan") -> None:
142143
self._active.flag = False
143144

144145
@staticmethod
145-
def _estimate_size(item: "StreamedSpan") -> int:
146+
def _estimate_size(item: "SpanJSON") -> int:
146147
# Rough estimate of serialized span size that's quick to compute.
147148
# 210 is the rough size of the payload without attributes, and then we
148149
# estimate the attributes separately.
149150
estimate = 210
150-
for value in item._attributes.values():
151+
for value in item["attributes"].values():
151152
estimate += 50
152153

153154
if isinstance(value, str):
@@ -158,26 +159,15 @@ def _estimate_size(item: "StreamedSpan") -> int:
158159
return estimate
159160

160161
@staticmethod
161-
def _to_transport_format(item: "StreamedSpan") -> "Any":
162-
res: "dict[str, Any]" = {
163-
"trace_id": item.trace_id,
164-
"span_id": item.span_id,
165-
"name": item._name if item._name is not None else "<unlabeled span>",
166-
"status": item._status,
167-
"is_segment": item._is_segment(),
168-
"start_timestamp": item._start_timestamp.timestamp(),
169-
}
170-
171-
if item._end_timestamp:
172-
res["end_timestamp"] = item._end_timestamp.timestamp()
173-
174-
if item._parent_span_id:
175-
res["parent_span_id"] = item._parent_span_id
176-
177-
if item._attributes:
162+
def _to_transport_format(item: "SpanJSON") -> "Any":
163+
res = {k: v for k, v in item.items() if k not in ("_segment_span",)}
164+
165+
if item.get("attributes"):
178166
res["attributes"] = {
179-
k: serialize_attribute(v) for (k, v) in item._attributes.items()
167+
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
180168
}
169+
else:
170+
del res["attributes"]
181171

182172
return res
183173

@@ -201,7 +191,7 @@ def _flush(self, only_pending: bool = False) -> None:
201191
if not spans:
202192
continue
203193

204-
dsc = spans[0]._dynamic_sampling_context()
194+
dsc = spans[0]["_segment_span"]._dynamic_sampling_context()
205195

206196
# Max per envelope is 1000, so if we happen to have more than
207197
# 1000 spans in one bucket, we'll need to separate them.

sentry_sdk/_types.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,21 @@ class SDKInfo(TypedDict):
317317

318318
MetricProcessor = Callable[[Metric, Hint], Optional[Metric]]
319319

320+
SpanJSON = TypedDict(
321+
"SpanJSON",
322+
{
323+
"trace_id": str,
324+
"span_id": str,
325+
"parent_span_id": NotRequired[str],
326+
"name": str,
327+
"status": str,
328+
"is_segment": bool,
329+
"start_timestamp": float,
330+
"end_timestamp": NotRequired[float],
331+
"attributes": NotRequired[Attributes],
332+
},
333+
)
334+
320335
# TODO: Make a proper type definition for this (PRs welcome!)
321336
Breadcrumb = Dict[str, Any]
322337

sentry_sdk/client.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import random
44
import socket
55
from collections.abc import Mapping
6-
from copy import deepcopy
76
from datetime import datetime, timezone
87
from importlib import import_module
98
from typing import TYPE_CHECKING, List, Dict, cast, overload
@@ -955,50 +954,50 @@ def _capture_telemetry(
955954

956955
if ty == "log":
957956
before_send = get_before_send_log(self.options)
958-
snapshot = telemetry
957+
serialized = telemetry
959958

960959
elif ty == "metric":
961960
before_send = get_before_send_metric(self.options)
962-
snapshot = telemetry
961+
serialized = telemetry
963962

964963
elif ty == "span":
965964
before_send = get_before_send_span(self.options)
966-
# We don't want to expose the actual underlying span in
967-
# before_send_span to not allow arbitrary edits. Expose a copy
968-
# instead.
969-
snapshot = deepcopy(telemetry)
965+
serialized = telemetry._to_json()
970966

971967
if before_send is not None:
972-
result = before_send(snapshot, {})
968+
serialized = before_send(serialized, {})
973969

974970
# Logs and metrics can be dropped in their respective
975971
# before_send, so if we get None, don't queue them for sending.
976972
if ty in ("log", "metric"):
977-
if result is None:
973+
if serialized is None:
978974
return
979975

980976
# Spans can't be dropped in before_send_span by design. They can
981-
# be altered though (name and attributes can be changed, e.g. to
982-
# sanitize).
983-
#
984-
# If we get anything but a StreamedSpan back from before_send_span,
985-
# just ignore it. Otherwise, take the returned StreamedSpan and
986-
# merge it with the original.
977+
# be altered though (e.g. to sanitize).
987978
elif ty == "span":
988-
if isinstance(result, StreamedSpan):
989-
telemetry._attributes = result._attributes
990-
telemetry._name = result._name
979+
if isinstance(serialized, dict) and serialized:
980+
# TODO[ivana]: Figure out the merging/validation here
981+
pass
982+
else:
983+
serialized = telemetry._to_json()
984+
logger.debug(
985+
"[Tracing] Invalid return value from before_send_span. Using original span."
986+
)
991987

992988
batcher = None
993989
if ty == "log":
994990
batcher = self.log_batcher
991+
995992
elif ty == "metric":
996993
batcher = self.metrics_batcher
994+
997995
elif ty == "span":
996+
serialized["_segment_span"] = telemetry._segment
998997
batcher = self.span_batcher
999998

1000999
if batcher is not None:
1001-
batcher.add(telemetry) # type: ignore
1000+
batcher.add(serialized) # type: ignore
10021001

10031002
def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
10041003
self._capture_telemetry(log, "log", scope)

sentry_sdk/consts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ class CompressionAlgo(Enum):
4646
from typing_extensions import Literal, TypedDict
4747

4848
import sentry_sdk
49-
from sentry_sdk.traces import StreamedSpan
5049
from sentry_sdk._types import (
5150
BreadcrumbProcessor,
5251
ContinuousProfilerMode,
@@ -57,6 +56,7 @@ class CompressionAlgo(Enum):
5756
Log,
5857
Metric,
5958
ProfilerMode,
59+
SpanJSON,
6060
TracesSampler,
6161
TransactionProcessor,
6262
)
@@ -87,7 +87,7 @@ class CompressionAlgo(Enum):
8787
"trace_lifecycle": Optional[Literal["static", "stream"]],
8888
"ignore_spans": Optional[IgnoreSpansConfig],
8989
"before_send_span": Optional[
90-
Callable[[StreamedSpan, Hint], Optional[StreamedSpan]]
90+
Callable[[SpanJSON, Hint], Optional[SpanJSON]]
9191
],
9292
"suppress_asgi_chained_exceptions": Optional[bool],
9393
},

sentry_sdk/traces.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
Union,
4444
)
4545

46-
from sentry_sdk._types import Attributes, AttributeValue
46+
from sentry_sdk._types import Attributes, AttributeValue, SpanJSON
4747
from sentry_sdk.profiler.continuous_profiler import ContinuousProfile
4848

4949
P = ParamSpec("P")
@@ -574,6 +574,26 @@ def _set_segment_attributes(self) -> None:
574574

575575
self.set_attribute("process.command_args", sys.argv)
576576

577+
def _to_json(self) -> "SpanJSON":
578+
res = {
579+
"trace_id": self.trace_id,
580+
"span_id": self.span_id,
581+
"name": self._name if self._name is not None else "<unlabeled span>",
582+
"status": self._status,
583+
"is_segment": self._is_segment(),
584+
"start_timestamp": self._start_timestamp.timestamp(),
585+
}
586+
587+
if self._end_timestamp:
588+
res["end_timestamp"] = self._end_timestamp.timestamp()
589+
590+
if self._parent_span_id:
591+
res["parent_span_id"] = self._parent_span_id
592+
593+
res["attributes"] = {k: v for k, v in self._attributes.items()}
594+
595+
return res
596+
577597

578598
class NoOpStreamedSpan(StreamedSpan):
579599
__slots__ = (

tests/tracing/test_span_streaming.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,12 @@ def traces_sampler(sampling_context):
273273

274274
def test_before_send_span_basic(sentry_init, capture_items):
275275
def before_send_span(span, hint):
276-
assert isinstance(span, StreamedSpan)
276+
assert isinstance(span, dict)
277277

278-
span.name = "Better span name"
279-
span.remove_attribute("drop")
280-
span.set_attribute("sanitize", "[Removed]")
281-
span.set_attribute("add", "new")
278+
span["name"] = "Better span name"
279+
del span["attributes"]["drop"]
280+
span["attributes"]["sanitize"] = "[Removed]"
281+
span["attributes"]["add"] = "new"
282282

283283
return span
284284

@@ -313,11 +313,17 @@ def before_send_span(span, hint):
313313
assert span["attributes"]["add"] == "new"
314314

315315

316-
def test_before_send_span_invalid_return_value(sentry_init, capture_items):
316+
@pytest.mark.parametrize(
317+
"return_value",
318+
[None, {}, {"not_a_span": True}],
319+
)
320+
def test_before_send_span_invalid_return_value(
321+
sentry_init, capture_items, return_value
322+
):
317323
def before_send_span(span, hint):
318324
# Spans can't be dropped in before_send_span, so unsupported return
319325
# values will be ignored
320-
return None
326+
return return_value
321327

322328
sentry_init(
323329
traces_sample_rate=1.0,
@@ -344,7 +350,9 @@ def before_send_span(span, hint):
344350
def test_before_send_span_unsupported_edit(sentry_init, capture_items):
345351
def before_send_span(span, hint):
346352
# Anything beyond attribute and name changes will be ignored
347-
span._trace_id = "my-trace-id"
353+
span["trace_id"] = "my-trace-id"
354+
355+
return span
348356

349357
sentry_init(
350358
traces_sample_rate=1.0,

0 commit comments

Comments
 (0)