Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions common/lib/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import atexit
import logging
import os
import socket
Expand Down Expand Up @@ -94,12 +95,40 @@ def _init_otel_metrics():
metric_readers=[reader],
)

# OTel Python's metrics.set_meter_provider may have already been
# called by the parent process. Calling it again in this child
# process replaces the global with our fork-local provider,
# which is what we want here.
metrics.set_meter_provider(provider)
meter = metrics.get_meter(__name__)
# Bind ``meter`` to OUR provider directly, NOT via the global
# ``metrics.get_meter(...)`` API.
#
# Why: OTel Python's ``set_meter_provider()`` is single-call. If
# auto-instrumentation (``opentelemetry-instrumentation``) is also
# active in this process, it has typically registered its own
# MeterProvider before our lazy init runs. Subsequent
# ``set_meter_provider`` calls are silently ignored, and
# ``metrics.get_meter(__name__)`` returns the global proxy bound
# to the auto-instrumentation's provider — whose resource we don't
# control. The visible symptom: our user-provided resource
# attributes (``host.name``, ``service.instance.id``) never reach
# the metrics backend, so per-process series collapse onto one
# fingerprint at the storage layer.
#
# Binding directly via ``provider.get_meter(__name__)`` sidesteps
# the global entirely. Our ``meter`` exports through OUR provider's
# pipeline (with the correct resource); auto-instrumentation
# metrics continue to flow through their own pipeline. Both end up
# at the OTel collector independently.
meter = provider.get_meter(__name__)

# Flush our provider's metric reader on process exit. The
# auto-instrumentation MeterProvider registers its own atexit
# hook against the global; ours doesn't (we never set it
# globally), so without this, ~5s of in-buffer metrics get
# dropped each time a worker exits cleanly (e.g. during a
# rolling deploy). Worker forks inherit the parent's atexit
# registrations; each child's own ``_init_otel_metrics`` then
# adds its OWN hook for its OWN provider. The inherited
# parent-provider hook is harmless in the child — the gRPC
# state it holds was copied at fork time and shutdown() on it
# is a no-op against a duplicated state.
atexit.register(provider.shutdown)

# Clear cached instrument handles inherited from the parent —
# they point at the parent's MeterProvider and exporter, neither
Expand Down
24 changes: 20 additions & 4 deletions common/tests/test_otel_fork_safe_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,26 @@ def test_first_call_in_a_process_initializes(self):
with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \
patch("lib.metrics.MeterProvider") as mock_mp, \
patch("lib.metrics.metrics.set_meter_provider") as mock_set, \
patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \
patch("lib.metrics.atexit.register") as mock_atexit, \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):

metrics._init_otel_metrics()

# MeterProvider was built, set globally, and we recorded the pid.
# MeterProvider was built and we recorded the pid.
assert mock_mp.called
assert mock_set.called
# We must NOT touch the global meter provider — auto-instrumentation
# may have already set it, and ``set_meter_provider`` is single-call.
# See the comment in ``_init_otel_metrics`` for the rationale.
assert not mock_set.called
# Meter must come from OUR provider, not the global proxy.
provider_instance = mock_mp.return_value
assert provider_instance.get_meter.called
assert metrics.meter is provider_instance.get_meter.return_value
assert metrics._otel_initialized_pid == os.getpid()
# Provider's shutdown is registered as an atexit hook so the
# PeriodicExportingMetricReader flushes its buffer on process
# exit — otherwise rolling deploys drop the last ~5s of metrics.
mock_atexit.assert_called_once_with(provider_instance.shutdown)

def test_second_call_in_same_pid_is_a_no_op(self):
from lib import metrics
Expand Down Expand Up @@ -72,13 +83,14 @@ def test_different_pid_triggers_reinit(self):
with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \
patch("lib.metrics.MeterProvider") as mock_mp, \
patch("lib.metrics.metrics.set_meter_provider") as mock_set, \
patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):

metrics._init_otel_metrics()

# Fresh provider built in this process.
assert mock_mp.called
# Same single-call discipline — never touch the global.
assert not mock_set.called
# Resource passed to MeterProvider contains a service.instance.id
# built from host + this pid.
kwargs = mock_mp.call_args.kwargs
Expand All @@ -87,6 +99,10 @@ def test_different_pid_triggers_reinit(self):
assert attrs["service.instance.id"] == f"{metrics.host_name}-pid-{os.getpid()}"
assert attrs["host.name"] == metrics.host_name

# Meter is bound to the new provider, not the global.
provider_instance = mock_mp.return_value
assert metrics.meter is provider_instance.get_meter.return_value

# Instrument caches inherited from the "parent" were cleared so
# they get rebuilt against the new MeterProvider.
assert metrics.counter_metrics == {}
Expand Down
139 changes: 56 additions & 83 deletions conserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,32 +236,13 @@ def process(self) -> None:
the overall processing flow. Will stop processing if any link indicates
the chain should not continue.

Creates a new span from context if available for trace propagation (POC).
Always opens a vcon_processing root span so the link.* and
storage.* spans for this vcon nest under one trace. A span
link to the producer's trace is attached when self.context
carries one; otherwise the span is created without a link.
"""
# Create span from context if available (POC) - use as context manager
if self.context:
self._span_context_manager = self._create_span_from_context()
if self._span_context_manager:
# Enter the context manager to make the span current
# This links the span to the parent trace
self._span = self._span_context_manager.__enter__()
# Verify trace linkage
if self._span:
span_ctx = self._span.get_span_context()
parent_trace_id = self.context.get("trace_id", "")
logger.info(
f"Span activated for vCon {self.vcon_id}: "
f"trace_id={format(span_ctx.trace_id, '032x')}, "
f"span_id={format(span_ctx.span_id, '016x')}, "
f"expected_trace_id={parent_trace_id}, "
f"match={format(span_ctx.trace_id, '032x') == parent_trace_id}"
)
else:
self._span = None
self._span_context_manager = None
else:
self._span = None
self._span_context_manager = None
self._span_context_manager = self._create_span_from_context()
self._span = self._span_context_manager.__enter__()

vcon_started = time.time()
logger.info(
Expand Down Expand Up @@ -306,68 +287,60 @@ def process(self) -> None:
record_histogram("conserver.main_loop.vcon_processing_time", vcon_processing_time, attributes=chain_attrs)
increment_counter("conserver.main_loop.count_vcons_processed", attributes=chain_attrs)

# End span if created - exit the context manager
if self._span_context_manager:
try:
current_span = trace.get_current_span()
if current_span:
current_span.set_status(Status(StatusCode.OK))
# Exit the context manager which will end the span
self._span_context_manager.__exit__(None, None, None)
except Exception as e:
logger.debug(f"Failed to end span: {e}")
# End the vcon_processing span we opened at the top of run().
try:
current_span = trace.get_current_span()
if current_span:
current_span.set_status(Status(StatusCode.OK))
self._span_context_manager.__exit__(None, None, None)
except Exception as e:
logger.debug(f"Failed to end span: {e}")

def _create_span_from_context(self):
"""Create a new span from propagated trace context using span links.

Since vCon processing is asynchronous (queued and processed later),
we use span links instead of parent-child relationships to represent
the causal relationship between the API request and the async processing.

"""Open a vcon_processing root span for this chain run.

vCon processing is asynchronous (the producer enqueues, this
worker dequeues later), so the relationship to any upstream
trace is expressed as a span *link* rather than a parent-child
edge. A link is attached only when ``self.context`` carries a
valid trace_id/span_id pair — set by the producer via
``store_context_*`` before enqueue. When no upstream context
is propagated (e.g. an adapter that doesn't speak OTel), the
span is created with no links so that ``link.*`` and
``storage.*`` child spans still nest under a single root
instead of becoming orphan traces.

Returns:
The span context manager, or None if creation failed
The span context manager (never None).
"""
if not self.context:
return None

try:
tracer = trace.get_tracer(__name__)

# Extract trace context from stored context
trace_id = int(self.context.get("trace_id", "0"), 16)
span_id = int(self.context.get("span_id", "0"), 16)
trace_flags = self.context.get("trace_flags", 0)

# Create span context from the propagated values
parent_span_context = SpanContext(
trace_id=trace_id,
span_id=span_id,
is_remote=True,
trace_flags=TraceFlags(trace_flags)
)

# Create a new span with a link to the parent span context
# This represents an async relationship rather than a parent-child relationship
# The span will be in the same trace but linked rather than nested
span_context_manager = tracer.start_as_current_span(
f"vcon_processing.{self.chain_details['name']}",
links=[Link(parent_span_context)],
attributes={
"vcon_id": self.vcon_id,
"chain_name": self.chain_details["name"],
"vcon.uuid": self.vcon_id
}
)

logger.debug(
f"Created linked span for vCon {self.vcon_id}: "
f"linked_trace_id={format(trace_id, '032x')}, linked_span_id={format(span_id, '016x')}"
)

return span_context_manager
except Exception as e:
logger.warning(f"Failed to create span from context for vCon {self.vcon_id}: {e}")
return None
tracer = trace.get_tracer(__name__)
links = []
if self.context:
try:
trace_id = int(self.context.get("trace_id", "0"), 16)
span_id = int(self.context.get("span_id", "0"), 16)
trace_flags = self.context.get("trace_flags", 0)
if trace_id and span_id:
links.append(Link(SpanContext(
trace_id=trace_id,
span_id=span_id,
is_remote=True,
trace_flags=TraceFlags(trace_flags),
)))
except Exception as e:
logger.warning(
f"Failed to build parent link for vCon {self.vcon_id}: {e}"
)

return tracer.start_as_current_span(
f"vcon_processing.{self.chain_details['name']}",
links=links,
attributes={
"vcon_id": self.vcon_id,
"chain_name": self.chain_details["name"],
"vcon.uuid": self.vcon_id,
},
)

def _wrap_up(self) -> None:
"""Handle post-processing operations for the vCon.
Expand Down
Loading