Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

if TYPE_CHECKING:
from typing import Any, Callable, Optional
from sentry_sdk.traces import StreamedSpan
from sentry_sdk._types import SpanJSON


class SpanBatcher(Batcher["StreamedSpan"]):
class SpanBatcher(Batcher["SpanJSON"]):
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between the trigger to flush
# and actually flushing the buffer.
Expand All @@ -42,7 +42,7 @@
# by trace_id, so that we can then send the buckets each in its own
# envelope.
# trace_id -> span buffer
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
self._span_buffer: dict[str, list["SpanJSON"]] = defaultdict(list)
self._running_size: dict[str, int] = defaultdict(lambda: 0)
self._capture_func = capture_func
self._record_lost_func = record_lost_func
Expand Down Expand Up @@ -99,7 +99,7 @@
self._flush()
self._last_full_flush = time.monotonic()

def add(self, span: "StreamedSpan") -> None:
def add(self, span: "SpanJSON") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
Expand All @@ -115,7 +115,7 @@
return None

with self._lock:
size = len(self._span_buffer[span.trace_id])
size = len(self._span_buffer[span["trace_id"]])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
Expand All @@ -124,14 +124,15 @@
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)
self._span_buffer[span["trace_id"]].append(span)
self._running_size[span["trace_id"]] += self._estimate_size(span)

if (
size + 1 >= self.MAX_BEFORE_FLUSH
or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH
or self._running_size[span["trace_id"]]
>= self.MAX_BYTES_BEFORE_FLUSH
):
self._pending_flush.add(span.trace_id)
self._pending_flush.add(span["trace_id"])
notify = True
else:
notify = False
Expand All @@ -142,12 +143,12 @@
self._active.flag = False

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
def _estimate_size(item: "SpanJSON") -> int:
# Rough estimate of serialized span size that's quick to compute.
# 210 is the rough size of the payload without attributes, and then we
# estimate the attributes separately.
estimate = 210
for value in item._attributes.values():
for value in item["attributes"].values():

Check warning on line 151 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

_estimate_size raises KeyError when span lacks attributes

_estimate_size accesses item["attributes"].values() unconditionally, but 'attributes' is NotRequired in SpanJSON. The new before_send_span feature allows users to return a modified dict. While the intent is that invalid returns should fall back to the original span, the validation at client.py:979 only checks isinstance(serialized, dict) and serialized (non-empty). This allows dicts like {"not_a_span": True} to pass validation. When such a dict reaches _span_batcher.add(), it will raise KeyError in _estimate_size at line 151, causing the span to be dropped and potentially propagating the exception.
Comment thread
sentrivana marked this conversation as resolved.
estimate += 50

if isinstance(value, str):
Expand All @@ -158,26 +159,15 @@
return estimate

@staticmethod
def _to_transport_format(item: "StreamedSpan") -> "Any":
res: "dict[str, Any]" = {
"trace_id": item.trace_id,
"span_id": item.span_id,
"name": item._name if item._name is not None else "<unlabeled span>",
"status": item._status,
"is_segment": item._is_segment(),
"start_timestamp": item._start_timestamp.timestamp(),
}

if item._end_timestamp:
res["end_timestamp"] = item._end_timestamp.timestamp()

if item._parent_span_id:
res["parent_span_id"] = item._parent_span_id

if item._attributes:
def _to_transport_format(item: "SpanJSON") -> "Any":
res = {k: v for k, v in item.items() if k not in ("_segment_span",)}

if item.get("attributes"):
res["attributes"] = {
k: serialize_attribute(v) for (k, v) in item._attributes.items()
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
}
else:
del res["attributes"]

Check warning on line 170 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[VPT-R9G] _estimate_size raises KeyError when span lacks attributes (additional location)

_estimate_size accesses item["attributes"].values() unconditionally, but 'attributes' is NotRequired in SpanJSON. The new before_send_span feature allows users to return a modified dict. While the intent is that invalid returns should fall back to the original span, the validation at client.py:979 only checks isinstance(serialized, dict) and serialized (non-empty). This allows dicts like {"not_a_span": True} to pass validation. When such a dict reaches _span_batcher.add(), it will raise KeyError in _estimate_size at line 151, causing the span to be dropped and potentially propagating the exception.

return res

Expand All @@ -201,7 +191,7 @@
if not spans:
continue

dsc = spans[0]._dynamic_sampling_context()
dsc = spans[0]["_segment_span"]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
Expand Down
15 changes: 15 additions & 0 deletions sentry_sdk/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,21 @@ class SDKInfo(TypedDict):

MetricProcessor = Callable[[Metric, Hint], Optional[Metric]]

SpanJSON = TypedDict(
"SpanJSON",
{
"trace_id": str,
"span_id": str,
"parent_span_id": NotRequired[str],
"name": str,
"status": str,
"is_segment": bool,
"start_timestamp": float,
"end_timestamp": NotRequired[float],
"attributes": NotRequired[Attributes],
},
)

# TODO: Make a proper type definition for this (PRs welcome!)
Breadcrumb = Dict[str, Any]

Expand Down
46 changes: 39 additions & 7 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
logger,
get_before_send_log,
get_before_send_metric,
get_before_send_span,
has_logs_enabled,
has_metrics_enabled,
)
from sentry_sdk.serializer import serialize
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing import trace
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.transport import (
Expand Down Expand Up @@ -71,7 +73,6 @@
from sentry_sdk.scope import Scope
from sentry_sdk.session import Session
from sentry_sdk.spotlight import SpotlightClient
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.transport import Transport, Item
from sentry_sdk._log_batcher import LogBatcher
from sentry_sdk._metrics_batcher import MetricsBatcher
Expand Down Expand Up @@ -938,34 +939,65 @@
ty: str,
scope: "Scope",
) -> None:
# Capture attributes-based telemetry (logs, metrics, spansV2)
"""
Capture attributes-based telemetry (logs, metrics, streamed spans).

Apply any attributes set on the scope to it, and run the user's
before_send_{telemetry} on it, if applicable.
"""
if telemetry is None:
return

scope.apply_to_telemetry(telemetry)

before_send = None

if ty == "log":
before_send = get_before_send_log(self.options)
serialized = telemetry

elif ty == "metric":
before_send = get_before_send_metric(self.options)
serialized = telemetry

if before_send is not None:
telemetry = before_send(telemetry, {}) # type: ignore
elif ty == "span":
before_send = get_before_send_span(self.options)
serialized = telemetry._to_json()

if telemetry is None:
return
if before_send is not None:
serialized = before_send(serialized, {})

# Logs and metrics can be dropped in their respective
# before_send, so if we get None, don't queue them for sending.
if ty in ("log", "metric"):
if serialized is None:
return

# Spans can't be dropped in before_send_span by design. They can
# be altered though (e.g. to sanitize).
elif ty == "span":
if isinstance(serialized, dict) and serialized:
# TODO[ivana]: Figure out the merging/validation here
pass
else:
serialized = telemetry._to_json()
logger.debug(
"[Tracing] Invalid return value from before_send_span. Using original span."
)

Check warning on line 986 in sentry_sdk/client.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[VPT-R9G] _estimate_size raises KeyError when span lacks attributes (additional location)

_estimate_size accesses item["attributes"].values() unconditionally, but 'attributes' is NotRequired in SpanJSON. The new before_send_span feature allows users to return a modified dict. While the intent is that invalid returns should fall back to the original span, the validation at client.py:979 only checks isinstance(serialized, dict) and serialized (non-empty). This allows dicts like {"not_a_span": True} to pass validation. When such a dict reaches _span_batcher.add(), it will raise KeyError in _estimate_size at line 151, causing the span to be dropped and potentially propagating the exception.

batcher = None
if ty == "log":
batcher = self.log_batcher

elif ty == "metric":
batcher = self.metrics_batcher

elif ty == "span":
serialized["_segment_span"] = telemetry._segment
batcher = self.span_batcher

if batcher is not None:
batcher.add(telemetry) # type: ignore
batcher.add(serialized) # type: ignore

def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
self._capture_telemetry(log, "log", scope)
Expand Down
4 changes: 4 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class CompressionAlgo(Enum):
Log,
Metric,
ProfilerMode,
SpanJSON,
TracesSampler,
TransactionProcessor,
)
Expand Down Expand Up @@ -85,6 +86,9 @@ class CompressionAlgo(Enum):
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
"trace_lifecycle": Optional[Literal["static", "stream"]],
"ignore_spans": Optional[IgnoreSpansConfig],
"before_send_span": Optional[
Callable[[SpanJSON, Hint], Optional[SpanJSON]]
],
"suppress_asgi_chained_exceptions": Optional[bool],
},
total=False,
Expand Down
22 changes: 21 additions & 1 deletion sentry_sdk/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
Union,
)

from sentry_sdk._types import Attributes, AttributeValue
from sentry_sdk._types import Attributes, AttributeValue, SpanJSON
from sentry_sdk.profiler.continuous_profiler import ContinuousProfile

P = ParamSpec("P")
Expand Down Expand Up @@ -574,6 +574,26 @@ def _set_segment_attributes(self) -> None:

self.set_attribute("process.command_args", sys.argv)

def _to_json(self) -> "SpanJSON":
res = {
"trace_id": self.trace_id,
"span_id": self.span_id,
"name": self._name if self._name is not None else "<unlabeled span>",
"status": self._status,
"is_segment": self._is_segment(),
"start_timestamp": self._start_timestamp.timestamp(),
}

if self._end_timestamp:
res["end_timestamp"] = self._end_timestamp.timestamp()

if self._parent_span_id:
res["parent_span_id"] = self._parent_span_id

res["attributes"] = {k: v for k, v in self._attributes.items()}

return res


class NoOpStreamedSpan(StreamedSpan):
__slots__ = (
Expand Down
10 changes: 10 additions & 0 deletions sentry_sdk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
Metric,
SerializedAttributeValue,
)
from sentry_sdk.traces import StreamedSpan

P = ParamSpec("P")
R = TypeVar("R")
Expand Down Expand Up @@ -2111,6 +2112,15 @@ def get_before_send_metric(
)


def get_before_send_span(
options: "Optional[dict[str, Any]]",
) -> "Optional[Callable[[StreamedSpan, Hint], Optional[StreamedSpan]]]":
if options is None:
return None

return options["_experiments"].get("before_send_span")


def format_attribute(val: "Any") -> "AttributeValue":
"""
Turn unsupported attribute value types into an AttributeValue.
Expand Down
Loading
Loading