Skip to content

Commit 8a9bf99

Browse files
feat(tracing): plug span-queue telemetry into merged next reliability
Wire OTel metrics into the post-merge span queue: enqueue/drop counters, batch coalescing depth/lag, drain phase timing, and export failure recording on permanent failure or exhausted retries. Preserves enqueued_at across re-enqueue for lag histograms. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 93dc835 commit 8a9bf99

3 files changed

Lines changed: 151 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
### Features
6+
7+
* **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``.
8+
39
## 0.11.4 (2026-05-26)
410

511
Full Changelog: [v0.11.3...v0.11.4](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.3...v0.11.4)

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from __future__ import annotations
22

33
import os
4+
import time
45
import asyncio
56
from enum import Enum
67
from dataclasses import dataclass
78

89
from agentex.types.span import Span
910
from agentex.lib.utils.logging import make_logger
11+
from agentex.lib.core.observability import tracing_metrics_recording as _metrics
1012
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
1113
AsyncTracingProcessor,
1214
)
@@ -67,6 +69,7 @@ class _SpanQueueItem:
6769
event_type: SpanEventType
6870
span: Span
6971
processors: list[AsyncTracingProcessor]
72+
enqueued_at: float | None = None
7073
# Number of times this item has already been dispatched. Used to bound
7174
# re-enqueue on transient failures.
7275
attempts: int = 0
@@ -134,6 +137,12 @@ def _record_drop(self, count: int, reason: str) -> None:
134137
if count <= 0:
135138
return
136139
self._dropped_spans += count
140+
if "shutdown" in reason:
141+
for _ in range(count):
142+
_metrics.record_span_dropped("shutdown")
143+
elif "queue full" in reason:
144+
for _ in range(count):
145+
_metrics.record_span_dropped("queue_full")
137146
# Warn on the first drop and then sparsely, so a drop storm is visible
138147
# without flooding the log.
139148
if self._dropped_spans == count or self._dropped_spans % 100 < count:
@@ -155,7 +164,15 @@ def enqueue(
155164
return
156165
self._ensure_drain_running()
157166
try:
158-
self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors))
167+
self._queue.put_nowait(
168+
_SpanQueueItem(
169+
event_type=event_type,
170+
span=span,
171+
processors=processors,
172+
enqueued_at=_metrics.monotonic_if_enabled(),
173+
)
174+
)
175+
_metrics.record_span_enqueued(event_type.value)
159176
except asyncio.QueueFull:
160177
self._record_drop(1, "queue full")
161178

@@ -197,16 +214,33 @@ async def _drain_loop(self) -> None:
197214
break
198215

199216
try:
217+
_metrics.record_batch_coalesced(
218+
queue_depth=self._queue.qsize() + len(batch),
219+
batch_items=batch,
220+
)
221+
200222
# Separate START and END events. Processing all STARTs before
201223
# ENDs ensures that on_span_start completes before on_span_end
202224
# for any span whose both events land in the same batch.
203225
starts = [i for i in batch if i.event_type == SpanEventType.START]
204226
ends = [i for i in batch if i.event_type == SpanEventType.END]
205227

206228
if starts:
229+
phase_start = time.perf_counter()
207230
await self._process_items(starts)
231+
_metrics.record_batch_phase(
232+
phase="start",
233+
size=len(starts),
234+
duration_ms=(time.perf_counter() - phase_start) * 1000.0,
235+
)
208236
if ends:
237+
phase_start = time.perf_counter()
209238
await self._process_items(ends)
239+
_metrics.record_batch_phase(
240+
phase="end",
241+
size=len(ends),
242+
duration_ms=(time.perf_counter() - phase_start) * 1000.0,
243+
)
210244
finally:
211245
for _ in batch:
212246
self._queue.task_done()
@@ -265,6 +299,12 @@ def _handle_failure(
265299
exhausted = len(items) - len(retriable)
266300
if exhausted:
267301
self._record_drop(exhausted, f"{type(p).__name__} retries exhausted during {event_type.value}")
302+
_metrics.record_export_failure(
303+
processor=p,
304+
event_type=event_type.value,
305+
span_count=exhausted,
306+
exc=exc,
307+
)
268308
for item in retriable:
269309
self._reenqueue(item, p)
270310
if retriable:
@@ -285,6 +325,12 @@ def _handle_failure(
285325
len(items),
286326
event_type.value,
287327
)
328+
_metrics.record_export_failure(
329+
processor=p,
330+
event_type=event_type.value,
331+
span_count=len(items),
332+
exc=exc,
333+
)
288334

289335
def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
290336
"""Put a single failed item back on the queue, scoped to the processor
@@ -295,6 +341,7 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
295341
event_type=item.event_type,
296342
span=item.span,
297343
processors=[p],
344+
enqueued_at=item.enqueued_at,
298345
attempts=item.attempts + 1,
299346
)
300347
)
@@ -312,9 +359,11 @@ async def shutdown(self, timeout: float = 30.0) -> None:
312359
try:
313360
await asyncio.wait_for(self._queue.join(), timeout=timeout)
314361
except asyncio.TimeoutError:
362+
remaining = self._queue.qsize()
315363
logger.warning(
316-
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize()
364+
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, remaining
317365
)
366+
_metrics.record_shutdown_timeout(remaining_items=remaining)
318367
if self._drain_task is not None and not self._drain_task.done():
319368
self._drain_task.cancel()
320369
try:

tests/lib/core/tracing/test_span_queue.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ async def capture_starts(spans: list[Span]) -> None:
401401
assert sum(len(b) for b in received) == 7
402402

403403

404+
404405
class _FakeHTTPError(Exception):
405406
"""Mimics an SGP/httpx status error: carries a ``status_code`` attribute."""
406407

@@ -630,3 +631,96 @@ async def capture_end(span: Span) -> None:
630631
# END should still carry output and end_time
631632
assert end_spans[0].output is not None
632633
assert end_spans[0].end_time is not None
634+
635+
636+
class TestAsyncSpanQueueMetrics:
637+
async def test_batch_coalesced_records_depth_including_batch(self, monkeypatch):
638+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
639+
import agentex.lib.core.observability.tracing_metrics_recording as recording
640+
641+
recording._metrics_enabled = None
642+
proc = _make_processor()
643+
queue = AsyncSpanQueue(linger_ms=0)
644+
recorded_depths: list[int] = []
645+
646+
def capture_coalesced(*, queue_depth: int, batch_items: object) -> None:
647+
recorded_depths.append(queue_depth)
648+
649+
with patch.object(recording, "record_batch_coalesced", side_effect=capture_coalesced):
650+
for _ in range(3):
651+
queue.enqueue(SpanEventType.START, _make_span(), [proc])
652+
await asyncio.sleep(0.05)
653+
await queue.shutdown()
654+
655+
assert recorded_depths, "expected at least one coalesced batch"
656+
assert recorded_depths[0] >= 3, (
657+
f"queue_depth should include batch items removed from queue, got {recorded_depths[0]}"
658+
)
659+
660+
async def test_enqueue_records_enqueued_metric(self, monkeypatch):
661+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
662+
import agentex.lib.core.observability.tracing_metrics_recording as recording
663+
664+
recording._metrics_enabled = None
665+
recording._tracing = None
666+
mock_metrics = MagicMock()
667+
proc = _make_processor()
668+
queue = AsyncSpanQueue()
669+
670+
with patch(
671+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
672+
return_value=mock_metrics,
673+
):
674+
queue.enqueue(SpanEventType.START, _make_span(), [proc])
675+
await asyncio.sleep(0.05)
676+
await queue.shutdown()
677+
678+
mock_metrics.span_events_enqueued.add.assert_any_call(1, {"event_type": "start"})
679+
680+
async def test_processor_failure_records_export_failure(self, monkeypatch):
681+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
682+
import agentex.lib.core.observability.tracing_metrics_recording as recording
683+
684+
recording._metrics_enabled = None
685+
recording._tracing = None
686+
mock_metrics = MagicMock()
687+
688+
class ExportError(Exception):
689+
pass
690+
691+
proc = AsyncMock()
692+
proc.on_spans_start = AsyncMock(side_effect=ExportError("Error code: 401 - denied"))
693+
proc.on_spans_end = AsyncMock()
694+
queue = AsyncSpanQueue()
695+
696+
with patch(
697+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
698+
return_value=mock_metrics,
699+
):
700+
queue.enqueue(SpanEventType.START, _make_span(), [proc])
701+
await asyncio.sleep(0.05)
702+
await queue.shutdown()
703+
704+
mock_metrics.export_batch_failures.add.assert_called_once()
705+
mock_metrics.export_span_failures.add.assert_called_once()
706+
707+
async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch):
708+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
709+
import agentex.lib.core.observability.tracing_metrics_recording as recording
710+
711+
recording._metrics_enabled = None
712+
recording._tracing = None
713+
proc = _make_processor()
714+
queue = AsyncSpanQueue()
715+
716+
with patch(
717+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics"
718+
) as mock_get:
719+
start = time.monotonic()
720+
for _ in range(200):
721+
queue.enqueue(SpanEventType.START, _make_span(), [proc])
722+
elapsed = time.monotonic() - start
723+
await queue.shutdown()
724+
725+
assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s"
726+
mock_get.assert_not_called()

0 commit comments

Comments
 (0)