From 712dd96e319a1686a77232194ed8b0f0690f5153 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Wed, 20 May 2026 23:57:05 +0530 Subject: [PATCH] fix(metrics): fork-safe OTel init + per-process service.instance.id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When conserver spawns multiple worker processes via multiprocessing, each fork inherits the parent's lib.metrics module state — including the already-initialized OTel SDK and the cached instrument handles. The OTel resource doesn't include any per-process attribute, so every forked worker emits metrics that share the same fingerprint at the collector. Downstream backends collapse those independent writes into a single series and the most recent worker's value overwrites the others. Visible effect: with WORKERS=2 and CONSERVER_VCON_CONCURRENCY=128, the new ``conserver.vcons.inflight`` UpDownCounter peaks at exactly 128 (one worker's pool) instead of the expected ~512 (2 pods × 2 workers × 128 capacity). Sum-across-pods queries can't recover the true cluster total. The same collision affects every conserver metric, not just inflight — every counter and histogram has been showing one-worker numbers, not cluster totals. Two-part fix: 1. Track init state by ``os.getpid()`` instead of a bare bool. A forked child has a different pid than the parent that initialized the module, so it re-enters the init path and rebuilds its own MeterProvider in the child's address space. The pid-based gate keeps the parent's single-call optimization intact. 2. Add ``service.instance.id = "-pid-"`` to the OTel resource so each forked worker has a unique fingerprint at the collector. Downstream queries can now ``spaceAggregation: sum`` across instances to get true cluster totals. Also clears the cached instrument handles on re-init in the child — they were created against the parent's MeterProvider and won't export correctly through the child's exporter. Tests: - New ``test_otel_fork_safe_init.py`` covers pid-based gate, re-init on pid change, instance.id resource attribute, and the endpoint-unset / exporter-failure paths. - Existing ``conftest.py`` and ``test_inflight_counter.py`` updated to patch the new ``_otel_initialized_pid`` (renamed from the old bare ``_otel_initialized``). Co-Authored-By: Claude Opus 4.7 (1M context) --- common/lib/metrics.py | 87 +++++++++++----- common/tests/test_inflight_counter.py | 5 +- common/tests/test_otel_fork_safe_init.py | 121 +++++++++++++++++++++++ conserver/tests/conftest.py | 10 +- 4 files changed, 194 insertions(+), 29 deletions(-) create mode 100644 common/tests/test_otel_fork_safe_init.py diff --git a/common/lib/metrics.py b/common/lib/metrics.py index ae6ee03..1e34af9 100644 --- a/common/lib/metrics.py +++ b/common/lib/metrics.py @@ -17,7 +17,12 @@ histogram_metrics = {} observable_gauges = {} updown_counter_metrics = {} -_otel_initialized = False +# Track init by pid (not a bare bool) so multiprocessing-fork children that +# inherit module state from the parent reinitialize their own MeterProvider. +# Without this, every forked worker shares the parent's resource fingerprint +# and SignOz collapses their independent UpDownCounter/Counter writes into +# one series — making cluster-wide aggregates impossible to compute. +_otel_initialized_pid = None logger = logging.getLogger(__name__) @@ -38,52 +43,82 @@ def stats_count(metric_name, value=1, tags=[]): def _init_otel_metrics(): - """Lazy initialization of OpenTelemetry metrics. - - This function is called automatically when OpenTelemetry metric methods - are first used. It only initializes if the endpoint is configured. + """Lazy + fork-safe initialization of OpenTelemetry metrics. + + Tracks the pid that last initialized. If the current process's pid + differs from that (i.e. we're in a forked child that inherited the + parent's module state), re-initialize so the child gets its own + MeterProvider with a unique ``service.instance.id``. + + Without this fork-safe init, every forked worker process shares the + parent's resource fingerprint, and the metrics backend collapses + their independent writes to a single series. With WORKERS=2 and + CONSERVER_VCON_CONCURRENCY=128, peak in-flight should reach ~256 per + pod, but the collapsed series only shows one worker's count (≤128). """ - global meter, _otel_initialized - - # Return early if already initialized or endpoint not configured - if _otel_initialized or not OTEL_EXPORTER_OTLP_ENDPOINT: + global meter, _otel_initialized_pid + current_pid = os.getpid() + + # Already initialized in this process — bail. + if _otel_initialized_pid == current_pid: return - + + if not OTEL_EXPORTER_OTLP_ENDPOINT: + # Remember we no-opped for this pid, so we don't keep re-checking. + _otel_initialized_pid = current_pid + return + try: - # Create resource with service name and host + # service.instance.id makes each process's series fingerprint + # distinct. Format: -pid- — host alone collapses + # multiple processes in the same pod into one fingerprint. + instance_id = f"{host_name}-pid-{current_pid}" + resource = Resource.create({ "service.name": OTEL_SERVICE_NAME, "host.name": host_name, + "service.instance.id": instance_id, }) - - # Create OTLP gRPC exporter + otlp_exporter = OTLPMetricExporter( endpoint=OTEL_EXPORTER_OTLP_ENDPOINT, ) - - # Create metric reader with exporter + reader = PeriodicExportingMetricReader( exporter=otlp_exporter, export_interval_millis=OTEL_METRIC_EXPORT_INTERVAL, ) - - # Create meter provider + provider = MeterProvider( resource=resource, metric_readers=[reader], ) - - # Set global meter provider + + # OTel Python's metrics.set_meter_provider may have already been + # called by the parent process. Calling it again in this child + # process replaces the global with our fork-local provider, + # which is what we want here. metrics.set_meter_provider(provider) - - # Get meter meter = metrics.get_meter(__name__) - _otel_initialized = True + + # Clear cached instrument handles inherited from the parent — + # they point at the parent's MeterProvider and exporter, neither + # of which run in this child. Recreate them lazily on next use + # against the new ``meter``. + counter_metrics.clear() + histogram_metrics.clear() + updown_counter_metrics.clear() + observable_gauges.clear() + + _otel_initialized_pid = current_pid + logger.info( + "OpenTelemetry metrics initialized: pid=%d, instance_id=%s", + current_pid, instance_id, + ) except Exception as e: - # Log error but don't fail initialization - logger = logging.getLogger(__name__) - logger.warning(f"Failed to initialize OpenTelemetry metrics: {e}") - _otel_initialized = True # Mark as initialized to avoid repeated attempts + logger.warning("Failed to initialize OpenTelemetry metrics: %s", e) + # Remember we tried for this pid; avoid retrying every call. + _otel_initialized_pid = current_pid def increment_counter(metric_name, value=1, attributes=None): diff --git a/common/tests/test_inflight_counter.py b/common/tests/test_inflight_counter.py index f6b1ec6..46d73e9 100644 --- a/common/tests/test_inflight_counter.py +++ b/common/tests/test_inflight_counter.py @@ -7,6 +7,7 @@ return). """ +import os from unittest.mock import MagicMock, patch import pytest @@ -27,7 +28,7 @@ def test_first_call_creates_instrument(self): fake_meter.create_up_down_counter.return_value = fake_instr with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \ - patch.object(metrics, "_otel_initialized", True), \ + patch.object(metrics, "_otel_initialized_pid", os.getpid()), \ patch.object(metrics, "meter", fake_meter): metrics.add_updown_counter("conserver.vcons.inflight", 1, attributes={"chain.name": "main"}) @@ -47,7 +48,7 @@ def test_second_call_reuses_instrument(self): fake_meter.create_up_down_counter.return_value = fake_instr with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \ - patch.object(metrics, "_otel_initialized", True), \ + patch.object(metrics, "_otel_initialized_pid", os.getpid()), \ patch.object(metrics, "meter", fake_meter): metrics.add_updown_counter("conserver.vcons.inflight", 1) metrics.add_updown_counter("conserver.vcons.inflight", -1) diff --git a/common/tests/test_otel_fork_safe_init.py b/common/tests/test_otel_fork_safe_init.py new file mode 100644 index 0000000..ad4b171 --- /dev/null +++ b/common/tests/test_otel_fork_safe_init.py @@ -0,0 +1,121 @@ +"""Unit tests for the fork-safe ``_init_otel_metrics`` behavior in +``common/lib/metrics.py``. + +The function tracks the pid that last initialized the OTel SDK in this +process. A different pid (i.e. we're in a forked child) triggers +re-initialization with a fresh ``service.instance.id``, a fresh meter, +and an emptied instrument cache. +""" + +import os +from unittest.mock import MagicMock, patch + +import pytest + + +def _reset_module_state(metrics_module): + """Put lib.metrics back into a pristine 'not yet initialized' state + so each test runs against the same baseline.""" + metrics_module._otel_initialized_pid = None + metrics_module.meter = None + metrics_module.counter_metrics.clear() + metrics_module.histogram_metrics.clear() + metrics_module.updown_counter_metrics.clear() + metrics_module.observable_gauges.clear() + + +class TestInitOtelMetricsForkSafe: + def test_first_call_in_a_process_initializes(self): + from lib import metrics + _reset_module_state(metrics) + + with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \ + patch("lib.metrics.MeterProvider") as mock_mp, \ + patch("lib.metrics.metrics.set_meter_provider") as mock_set, \ + patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \ + patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): + + metrics._init_otel_metrics() + + # MeterProvider was built, set globally, and we recorded the pid. + assert mock_mp.called + assert mock_set.called + assert metrics._otel_initialized_pid == os.getpid() + + def test_second_call_in_same_pid_is_a_no_op(self): + from lib import metrics + _reset_module_state(metrics) + + # Mark this pid as already initialized. + metrics._otel_initialized_pid = os.getpid() + + with patch("lib.metrics.MeterProvider") as mock_mp, \ + patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): + metrics._init_otel_metrics() + + # Skipped entirely — no new MeterProvider construction. + assert not mock_mp.called + + def test_different_pid_triggers_reinit(self): + """When a forked child inherits ``_otel_initialized_pid`` from + the parent, ``_init_otel_metrics`` MUST detect the pid mismatch + and rebuild the MeterProvider in the child's address space.""" + from lib import metrics + _reset_module_state(metrics) + + # Simulate having been initialized in a different (parent) pid. + metrics._otel_initialized_pid = os.getpid() + 1000 # any other pid + # Also seed the instrument caches as the parent would have. + metrics.counter_metrics["dummy"] = MagicMock() + metrics.updown_counter_metrics["dummy"] = MagicMock() + + with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \ + patch("lib.metrics.MeterProvider") as mock_mp, \ + patch("lib.metrics.metrics.set_meter_provider") as mock_set, \ + patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \ + patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): + + metrics._init_otel_metrics() + + # Fresh provider built in this process. + assert mock_mp.called + # Resource passed to MeterProvider contains a service.instance.id + # built from host + this pid. + kwargs = mock_mp.call_args.kwargs + resource = kwargs["resource"] + attrs = dict(resource.attributes) + assert attrs["service.instance.id"] == f"{metrics.host_name}-pid-{os.getpid()}" + assert attrs["host.name"] == metrics.host_name + + # Instrument caches inherited from the "parent" were cleared so + # they get rebuilt against the new MeterProvider. + assert metrics.counter_metrics == {} + assert metrics.updown_counter_metrics == {} + + # New pid is recorded. + assert metrics._otel_initialized_pid == os.getpid() + + def test_no_endpoint_recorded_as_initialized(self): + """If OTLP endpoint is not configured we still record the pid + as initialized, so subsequent calls bail without re-checking.""" + from lib import metrics + _reset_module_state(metrics) + + with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", None): + metrics._init_otel_metrics() + + assert metrics._otel_initialized_pid == os.getpid() + assert metrics.meter is None # never created + + def test_exporter_failure_still_marks_pid_initialized(self): + """If MeterProvider construction throws, we should still mark + this pid as 'tried' so we don't hot-loop trying every call.""" + from lib import metrics + _reset_module_state(metrics) + + with patch("lib.metrics.OTLPMetricExporter", + side_effect=RuntimeError("connection refused")), \ + patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"): + metrics._init_otel_metrics() + + assert metrics._otel_initialized_pid == os.getpid() diff --git a/conserver/tests/conftest.py b/conserver/tests/conftest.py index ea21e99..0d25424 100644 --- a/conserver/tests/conftest.py +++ b/conserver/tests/conftest.py @@ -1,3 +1,4 @@ +import os import pytest from unittest.mock import patch from opentelemetry.sdk.metrics import MeterProvider @@ -10,6 +11,11 @@ def metric_reader(): Yields the InMemoryMetricReader so tests can assert on emitted metrics. Each test gets a fresh meter with no prior state. + + Patches ``_otel_initialized_pid`` to the current pid so the + lazy-init helper bails out without trying to instantiate a real OTLP + exporter (which would clobber the in-memory meter and try to dial + a non-existent collector). """ reader = InMemoryMetricReader() provider = MeterProvider(metric_readers=[reader]) @@ -18,10 +24,12 @@ def metric_reader(): with patch.multiple( "lib.metrics", meter=test_meter, - _otel_initialized=True, + _otel_initialized_pid=os.getpid(), OTEL_EXPORTER_OTLP_ENDPOINT="http://test-collector:4317", counter_metrics={}, histogram_metrics={}, + updown_counter_metrics={}, + observable_gauges={}, ): yield reader