From d679a4c6b7950729935f27949db34038f276f6d5 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Thu, 21 May 2026 01:01:56 +0530 Subject: [PATCH 1/2] fix(metrics): bind meter to our provider directly, bypass single-call global MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to vcon-dev/vcon-server#178. The fork-safe init landed, but SignOz queries still showed only one worker's value (peak inflight 128 across 4 worker processes instead of the expected ~512). Root cause: the user-provided OTel resource (``host.name``, ``service.instance.id``) never reached the metrics backend. OpenTelemetry Python's ``metrics.set_meter_provider()`` is single-call. When ``opentelemetry-instrumentation`` (auto-instrumentation) is active in the process, it registers a default MeterProvider early in startup — before this module's lazy ``_init_otel_metrics()`` runs. Our subsequent ``set_meter_provider(our_provider)`` call is silently ignored, and ``metrics.get_meter(__name__)`` returns the global proxy bound to the auto-instrumentation's provider — whose resource we don't control. The visible CH symptom: ``resource_attrs`` on every conserver.* metric only contains ``service.name`` + ``telemetry.sdk.*`` + ``telemetry.auto.version``; ``host.name`` and ``service.instance.id`` that this module sets are dropped. All four worker processes collapse onto a single fingerprint, last-write-wins. Fix: bind ``meter`` to OUR provider directly via ``provider.get_meter(__name__)``, never touching the global. Our provider has the correct resource and its own export pipeline to the OTel collector. Auto-instrumentation metrics continue to flow through their own pipeline independently — both arrive at the collector. Tests updated to assert ``set_meter_provider`` is NOT called and the ``meter`` global is bound to the provider's own meter, not the global proxy. Co-Authored-By: Claude Opus 4.7 (1M context) --- common/lib/metrics.py | 41 ++++++++++++++++++++---- common/tests/test_otel_fork_safe_init.py | 24 +++++++++++--- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/common/lib/metrics.py b/common/lib/metrics.py index 1e34af9..9e9446d 100644 --- a/common/lib/metrics.py +++ b/common/lib/metrics.py @@ -1,3 +1,4 @@ +import atexit import logging import os import socket @@ -94,12 +95,40 @@ def _init_otel_metrics(): metric_readers=[reader], ) - # OTel Python's metrics.set_meter_provider may have already been - # called by the parent process. Calling it again in this child - # process replaces the global with our fork-local provider, - # which is what we want here. - metrics.set_meter_provider(provider) - meter = metrics.get_meter(__name__) + # Bind ``meter`` to OUR provider directly, NOT via the global + # ``metrics.get_meter(...)`` API. + # + # Why: OTel Python's ``set_meter_provider()`` is single-call. If + # auto-instrumentation (``opentelemetry-instrumentation``) is also + # active in this process, it has typically registered its own + # MeterProvider before our lazy init runs. Subsequent + # ``set_meter_provider`` calls are silently ignored, and + # ``metrics.get_meter(__name__)`` returns the global proxy bound + # to the auto-instrumentation's provider — whose resource we don't + # control. The visible symptom: our user-provided resource + # attributes (``host.name``, ``service.instance.id``) never reach + # the metrics backend, so per-process series collapse onto one + # fingerprint at the storage layer. + # + # Binding directly via ``provider.get_meter(__name__)`` sidesteps + # the global entirely. Our ``meter`` exports through OUR provider's + # pipeline (with the correct resource); auto-instrumentation + # metrics continue to flow through their own pipeline. Both end up + # at the OTel collector independently. + meter = provider.get_meter(__name__) + + # Flush our provider's metric reader on process exit. The + # auto-instrumentation MeterProvider registers its own atexit + # hook against the global; ours doesn't (we never set it + # globally), so without this, ~5s of in-buffer metrics get + # dropped each time a worker exits cleanly (e.g. during a + # rolling deploy). Worker forks inherit the parent's atexit + # registrations; each child's own ``_init_otel_metrics`` then + # adds its OWN hook for its OWN provider. The inherited + # parent-provider hook is harmless in the child — the gRPC + # state it holds was copied at fork time and shutdown() on it + # is a no-op against a duplicated state. + atexit.register(provider.shutdown) # Clear cached instrument handles inherited from the parent — # they point at the parent's MeterProvider and exporter, neither diff --git a/common/tests/test_otel_fork_safe_init.py b/common/tests/test_otel_fork_safe_init.py index ad4b171..e27a9c5 100644 --- a/common/tests/test_otel_fork_safe_init.py +++ b/common/tests/test_otel_fork_safe_init.py @@ -32,15 +32,26 @@ def test_first_call_in_a_process_initializes(self): with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \ patch("lib.metrics.MeterProvider") as mock_mp, \ patch("lib.metrics.metrics.set_meter_provider") as mock_set, \ - patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \ + patch("lib.metrics.atexit.register") as mock_atexit, \ patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): metrics._init_otel_metrics() - # MeterProvider was built, set globally, and we recorded the pid. + # MeterProvider was built and we recorded the pid. assert mock_mp.called - assert mock_set.called + # We must NOT touch the global meter provider — auto-instrumentation + # may have already set it, and ``set_meter_provider`` is single-call. + # See the comment in ``_init_otel_metrics`` for the rationale. + assert not mock_set.called + # Meter must come from OUR provider, not the global proxy. + provider_instance = mock_mp.return_value + assert provider_instance.get_meter.called + assert metrics.meter is provider_instance.get_meter.return_value assert metrics._otel_initialized_pid == os.getpid() + # Provider's shutdown is registered as an atexit hook so the + # PeriodicExportingMetricReader flushes its buffer on process + # exit — otherwise rolling deploys drop the last ~5s of metrics. + mock_atexit.assert_called_once_with(provider_instance.shutdown) def test_second_call_in_same_pid_is_a_no_op(self): from lib import metrics @@ -72,13 +83,14 @@ def test_different_pid_triggers_reinit(self): with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \ patch("lib.metrics.MeterProvider") as mock_mp, \ patch("lib.metrics.metrics.set_meter_provider") as mock_set, \ - patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \ patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): metrics._init_otel_metrics() # Fresh provider built in this process. assert mock_mp.called + # Same single-call discipline — never touch the global. + assert not mock_set.called # Resource passed to MeterProvider contains a service.instance.id # built from host + this pid. kwargs = mock_mp.call_args.kwargs @@ -87,6 +99,10 @@ def test_different_pid_triggers_reinit(self): assert attrs["service.instance.id"] == f"{metrics.host_name}-pid-{os.getpid()}" assert attrs["host.name"] == metrics.host_name + # Meter is bound to the new provider, not the global. + provider_instance = mock_mp.return_value + assert metrics.meter is provider_instance.get_meter.return_value + # Instrument caches inherited from the "parent" were cleared so # they get rebuilt against the new MeterProvider. assert metrics.counter_metrics == {} From b75d2af4e335b29be99a45ada8c62f3090803476 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Thu, 21 May 2026 12:08:40 +0530 Subject: [PATCH 2/2] fix(traces): always open vcon_processing root span, link if upstream context present MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the vcon_processing. span was only created when self.context carried an upstream trace context (extracted from Redis under context:{ingress}:{vcon_uuid}). When the producer didn't store a context — e.g. an adapter that ingests directly to the ingress list without OTel instrumentation — _create_span_from_context short- circuited to None and every link.* / storage.* span emitted during the chain run became its own root trace, defeating the trace-per-vcon model. Refactor _create_span_from_context to always return a span context manager. A span link to the producer's trace is attached only when trace_id and span_id parse to non-zero values; otherwise the span opens without links. Trace structure is preserved either way, and the link semantics still light up automatically once producers (e.g. the BDS adapter) begin propagating context via store_context_*. Net: -27 lines (139 changed, 56+/83-). Run-method setup collapses from a 25-line if/else dance to two lines; the POC trace-id verification log goes away. Co-Authored-By: Claude Opus 4.7 (1M context) --- conserver/main.py | 139 +++++++++++++++++++--------------------------- 1 file changed, 56 insertions(+), 83 deletions(-) diff --git a/conserver/main.py b/conserver/main.py index a09e170..c2cc145 100644 --- a/conserver/main.py +++ b/conserver/main.py @@ -236,32 +236,13 @@ def process(self) -> None: the overall processing flow. Will stop processing if any link indicates the chain should not continue. - Creates a new span from context if available for trace propagation (POC). + Always opens a vcon_processing root span so the link.* and + storage.* spans for this vcon nest under one trace. A span + link to the producer's trace is attached when self.context + carries one; otherwise the span is created without a link. """ - # Create span from context if available (POC) - use as context manager - if self.context: - self._span_context_manager = self._create_span_from_context() - if self._span_context_manager: - # Enter the context manager to make the span current - # This links the span to the parent trace - self._span = self._span_context_manager.__enter__() - # Verify trace linkage - if self._span: - span_ctx = self._span.get_span_context() - parent_trace_id = self.context.get("trace_id", "") - logger.info( - f"Span activated for vCon {self.vcon_id}: " - f"trace_id={format(span_ctx.trace_id, '032x')}, " - f"span_id={format(span_ctx.span_id, '016x')}, " - f"expected_trace_id={parent_trace_id}, " - f"match={format(span_ctx.trace_id, '032x') == parent_trace_id}" - ) - else: - self._span = None - self._span_context_manager = None - else: - self._span = None - self._span_context_manager = None + self._span_context_manager = self._create_span_from_context() + self._span = self._span_context_manager.__enter__() vcon_started = time.time() logger.info( @@ -306,68 +287,60 @@ def process(self) -> None: record_histogram("conserver.main_loop.vcon_processing_time", vcon_processing_time, attributes=chain_attrs) increment_counter("conserver.main_loop.count_vcons_processed", attributes=chain_attrs) - # End span if created - exit the context manager - if self._span_context_manager: - try: - current_span = trace.get_current_span() - if current_span: - current_span.set_status(Status(StatusCode.OK)) - # Exit the context manager which will end the span - self._span_context_manager.__exit__(None, None, None) - except Exception as e: - logger.debug(f"Failed to end span: {e}") + # End the vcon_processing span we opened at the top of run(). + try: + current_span = trace.get_current_span() + if current_span: + current_span.set_status(Status(StatusCode.OK)) + self._span_context_manager.__exit__(None, None, None) + except Exception as e: + logger.debug(f"Failed to end span: {e}") def _create_span_from_context(self): - """Create a new span from propagated trace context using span links. - - Since vCon processing is asynchronous (queued and processed later), - we use span links instead of parent-child relationships to represent - the causal relationship between the API request and the async processing. - + """Open a vcon_processing root span for this chain run. + + vCon processing is asynchronous (the producer enqueues, this + worker dequeues later), so the relationship to any upstream + trace is expressed as a span *link* rather than a parent-child + edge. A link is attached only when ``self.context`` carries a + valid trace_id/span_id pair — set by the producer via + ``store_context_*`` before enqueue. When no upstream context + is propagated (e.g. an adapter that doesn't speak OTel), the + span is created with no links so that ``link.*`` and + ``storage.*`` child spans still nest under a single root + instead of becoming orphan traces. + Returns: - The span context manager, or None if creation failed + The span context manager (never None). """ - if not self.context: - return None - - try: - tracer = trace.get_tracer(__name__) - - # Extract trace context from stored context - trace_id = int(self.context.get("trace_id", "0"), 16) - span_id = int(self.context.get("span_id", "0"), 16) - trace_flags = self.context.get("trace_flags", 0) - - # Create span context from the propagated values - parent_span_context = SpanContext( - trace_id=trace_id, - span_id=span_id, - is_remote=True, - trace_flags=TraceFlags(trace_flags) - ) - - # Create a new span with a link to the parent span context - # This represents an async relationship rather than a parent-child relationship - # The span will be in the same trace but linked rather than nested - span_context_manager = tracer.start_as_current_span( - f"vcon_processing.{self.chain_details['name']}", - links=[Link(parent_span_context)], - attributes={ - "vcon_id": self.vcon_id, - "chain_name": self.chain_details["name"], - "vcon.uuid": self.vcon_id - } - ) - - logger.debug( - f"Created linked span for vCon {self.vcon_id}: " - f"linked_trace_id={format(trace_id, '032x')}, linked_span_id={format(span_id, '016x')}" - ) - - return span_context_manager - except Exception as e: - logger.warning(f"Failed to create span from context for vCon {self.vcon_id}: {e}") - return None + tracer = trace.get_tracer(__name__) + links = [] + if self.context: + try: + trace_id = int(self.context.get("trace_id", "0"), 16) + span_id = int(self.context.get("span_id", "0"), 16) + trace_flags = self.context.get("trace_flags", 0) + if trace_id and span_id: + links.append(Link(SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=True, + trace_flags=TraceFlags(trace_flags), + ))) + except Exception as e: + logger.warning( + f"Failed to build parent link for vCon {self.vcon_id}: {e}" + ) + + return tracer.start_as_current_span( + f"vcon_processing.{self.chain_details['name']}", + links=links, + attributes={ + "vcon_id": self.vcon_id, + "chain_name": self.chain_details["name"], + "vcon.uuid": self.vcon_id, + }, + ) def _wrap_up(self) -> None: """Handle post-processing operations for the vCon.