diff --git a/common/lib/metrics.py b/common/lib/metrics.py index 1e34af9..9e9446d 100644 --- a/common/lib/metrics.py +++ b/common/lib/metrics.py @@ -1,3 +1,4 @@ +import atexit import logging import os import socket @@ -94,12 +95,40 @@ def _init_otel_metrics(): metric_readers=[reader], ) - # OTel Python's metrics.set_meter_provider may have already been - # called by the parent process. Calling it again in this child - # process replaces the global with our fork-local provider, - # which is what we want here. - metrics.set_meter_provider(provider) - meter = metrics.get_meter(__name__) + # Bind ``meter`` to OUR provider directly, NOT via the global + # ``metrics.get_meter(...)`` API. + # + # Why: OTel Python's ``set_meter_provider()`` is single-call. If + # auto-instrumentation (``opentelemetry-instrumentation``) is also + # active in this process, it has typically registered its own + # MeterProvider before our lazy init runs. Subsequent + # ``set_meter_provider`` calls are silently ignored, and + # ``metrics.get_meter(__name__)`` returns the global proxy bound + # to the auto-instrumentation's provider — whose resource we don't + # control. The visible symptom: our user-provided resource + # attributes (``host.name``, ``service.instance.id``) never reach + # the metrics backend, so per-process series collapse onto one + # fingerprint at the storage layer. + # + # Binding directly via ``provider.get_meter(__name__)`` sidesteps + # the global entirely. Our ``meter`` exports through OUR provider's + # pipeline (with the correct resource); auto-instrumentation + # metrics continue to flow through their own pipeline. Both end up + # at the OTel collector independently. + meter = provider.get_meter(__name__) + + # Flush our provider's metric reader on process exit. The + # auto-instrumentation MeterProvider registers its own atexit + # hook against the global; ours doesn't (we never set it + # globally), so without this, ~5s of in-buffer metrics get + # dropped each time a worker exits cleanly (e.g. during a + # rolling deploy). Worker forks inherit the parent's atexit + # registrations; each child's own ``_init_otel_metrics`` then + # adds its OWN hook for its OWN provider. The inherited + # parent-provider hook is harmless in the child — the gRPC + # state it holds was copied at fork time and shutdown() on it + # is a no-op against a duplicated state. + atexit.register(provider.shutdown) # Clear cached instrument handles inherited from the parent — # they point at the parent's MeterProvider and exporter, neither diff --git a/common/tests/test_otel_fork_safe_init.py b/common/tests/test_otel_fork_safe_init.py index ad4b171..e27a9c5 100644 --- a/common/tests/test_otel_fork_safe_init.py +++ b/common/tests/test_otel_fork_safe_init.py @@ -32,15 +32,26 @@ def test_first_call_in_a_process_initializes(self): with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \ patch("lib.metrics.MeterProvider") as mock_mp, \ patch("lib.metrics.metrics.set_meter_provider") as mock_set, \ - patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \ + patch("lib.metrics.atexit.register") as mock_atexit, \ patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): metrics._init_otel_metrics() - # MeterProvider was built, set globally, and we recorded the pid. + # MeterProvider was built and we recorded the pid. assert mock_mp.called - assert mock_set.called + # We must NOT touch the global meter provider — auto-instrumentation + # may have already set it, and ``set_meter_provider`` is single-call. + # See the comment in ``_init_otel_metrics`` for the rationale. + assert not mock_set.called + # Meter must come from OUR provider, not the global proxy. + provider_instance = mock_mp.return_value + assert provider_instance.get_meter.called + assert metrics.meter is provider_instance.get_meter.return_value assert metrics._otel_initialized_pid == os.getpid() + # Provider's shutdown is registered as an atexit hook so the + # PeriodicExportingMetricReader flushes its buffer on process + # exit — otherwise rolling deploys drop the last ~5s of metrics. + mock_atexit.assert_called_once_with(provider_instance.shutdown) def test_second_call_in_same_pid_is_a_no_op(self): from lib import metrics @@ -72,13 +83,14 @@ def test_different_pid_triggers_reinit(self): with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \ patch("lib.metrics.MeterProvider") as mock_mp, \ patch("lib.metrics.metrics.set_meter_provider") as mock_set, \ - patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \ patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): metrics._init_otel_metrics() # Fresh provider built in this process. assert mock_mp.called + # Same single-call discipline — never touch the global. + assert not mock_set.called # Resource passed to MeterProvider contains a service.instance.id # built from host + this pid. kwargs = mock_mp.call_args.kwargs @@ -87,6 +99,10 @@ def test_different_pid_triggers_reinit(self): assert attrs["service.instance.id"] == f"{metrics.host_name}-pid-{os.getpid()}" assert attrs["host.name"] == metrics.host_name + # Meter is bound to the new provider, not the global. + provider_instance = mock_mp.return_value + assert metrics.meter is provider_instance.get_meter.return_value + # Instrument caches inherited from the "parent" were cleared so # they get rebuilt against the new MeterProvider. assert metrics.counter_metrics == {} diff --git a/conserver/main.py b/conserver/main.py index a09e170..c2cc145 100644 --- a/conserver/main.py +++ b/conserver/main.py @@ -236,32 +236,13 @@ def process(self) -> None: the overall processing flow. Will stop processing if any link indicates the chain should not continue. - Creates a new span from context if available for trace propagation (POC). + Always opens a vcon_processing root span so the link.* and + storage.* spans for this vcon nest under one trace. A span + link to the producer's trace is attached when self.context + carries one; otherwise the span is created without a link. """ - # Create span from context if available (POC) - use as context manager - if self.context: - self._span_context_manager = self._create_span_from_context() - if self._span_context_manager: - # Enter the context manager to make the span current - # This links the span to the parent trace - self._span = self._span_context_manager.__enter__() - # Verify trace linkage - if self._span: - span_ctx = self._span.get_span_context() - parent_trace_id = self.context.get("trace_id", "") - logger.info( - f"Span activated for vCon {self.vcon_id}: " - f"trace_id={format(span_ctx.trace_id, '032x')}, " - f"span_id={format(span_ctx.span_id, '016x')}, " - f"expected_trace_id={parent_trace_id}, " - f"match={format(span_ctx.trace_id, '032x') == parent_trace_id}" - ) - else: - self._span = None - self._span_context_manager = None - else: - self._span = None - self._span_context_manager = None + self._span_context_manager = self._create_span_from_context() + self._span = self._span_context_manager.__enter__() vcon_started = time.time() logger.info( @@ -306,68 +287,60 @@ def process(self) -> None: record_histogram("conserver.main_loop.vcon_processing_time", vcon_processing_time, attributes=chain_attrs) increment_counter("conserver.main_loop.count_vcons_processed", attributes=chain_attrs) - # End span if created - exit the context manager - if self._span_context_manager: - try: - current_span = trace.get_current_span() - if current_span: - current_span.set_status(Status(StatusCode.OK)) - # Exit the context manager which will end the span - self._span_context_manager.__exit__(None, None, None) - except Exception as e: - logger.debug(f"Failed to end span: {e}") + # End the vcon_processing span we opened at the top of run(). + try: + current_span = trace.get_current_span() + if current_span: + current_span.set_status(Status(StatusCode.OK)) + self._span_context_manager.__exit__(None, None, None) + except Exception as e: + logger.debug(f"Failed to end span: {e}") def _create_span_from_context(self): - """Create a new span from propagated trace context using span links. - - Since vCon processing is asynchronous (queued and processed later), - we use span links instead of parent-child relationships to represent - the causal relationship between the API request and the async processing. - + """Open a vcon_processing root span for this chain run. + + vCon processing is asynchronous (the producer enqueues, this + worker dequeues later), so the relationship to any upstream + trace is expressed as a span *link* rather than a parent-child + edge. A link is attached only when ``self.context`` carries a + valid trace_id/span_id pair — set by the producer via + ``store_context_*`` before enqueue. When no upstream context + is propagated (e.g. an adapter that doesn't speak OTel), the + span is created with no links so that ``link.*`` and + ``storage.*`` child spans still nest under a single root + instead of becoming orphan traces. + Returns: - The span context manager, or None if creation failed + The span context manager (never None). """ - if not self.context: - return None - - try: - tracer = trace.get_tracer(__name__) - - # Extract trace context from stored context - trace_id = int(self.context.get("trace_id", "0"), 16) - span_id = int(self.context.get("span_id", "0"), 16) - trace_flags = self.context.get("trace_flags", 0) - - # Create span context from the propagated values - parent_span_context = SpanContext( - trace_id=trace_id, - span_id=span_id, - is_remote=True, - trace_flags=TraceFlags(trace_flags) - ) - - # Create a new span with a link to the parent span context - # This represents an async relationship rather than a parent-child relationship - # The span will be in the same trace but linked rather than nested - span_context_manager = tracer.start_as_current_span( - f"vcon_processing.{self.chain_details['name']}", - links=[Link(parent_span_context)], - attributes={ - "vcon_id": self.vcon_id, - "chain_name": self.chain_details["name"], - "vcon.uuid": self.vcon_id - } - ) - - logger.debug( - f"Created linked span for vCon {self.vcon_id}: " - f"linked_trace_id={format(trace_id, '032x')}, linked_span_id={format(span_id, '016x')}" - ) - - return span_context_manager - except Exception as e: - logger.warning(f"Failed to create span from context for vCon {self.vcon_id}: {e}") - return None + tracer = trace.get_tracer(__name__) + links = [] + if self.context: + try: + trace_id = int(self.context.get("trace_id", "0"), 16) + span_id = int(self.context.get("span_id", "0"), 16) + trace_flags = self.context.get("trace_flags", 0) + if trace_id and span_id: + links.append(Link(SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=True, + trace_flags=TraceFlags(trace_flags), + ))) + except Exception as e: + logger.warning( + f"Failed to build parent link for vCon {self.vcon_id}: {e}" + ) + + return tracer.start_as_current_span( + f"vcon_processing.{self.chain_details['name']}", + links=links, + attributes={ + "vcon_id": self.vcon_id, + "chain_name": self.chain_details["name"], + "vcon.uuid": self.vcon_id, + }, + ) def _wrap_up(self) -> None: """Handle post-processing operations for the vCon.