Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 61 additions & 26 deletions common/lib/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
histogram_metrics = {}
observable_gauges = {}
updown_counter_metrics = {}
_otel_initialized = False
# Track init by pid (not a bare bool) so multiprocessing-fork children that
# inherit module state from the parent reinitialize their own MeterProvider.
# Without this, every forked worker shares the parent's resource fingerprint
# and SignOz collapses their independent UpDownCounter/Counter writes into
# one series — making cluster-wide aggregates impossible to compute.
_otel_initialized_pid = None

logger = logging.getLogger(__name__)

Expand All @@ -38,52 +43,82 @@ def stats_count(metric_name, value=1, tags=[]):


def _init_otel_metrics():
"""Lazy initialization of OpenTelemetry metrics.

This function is called automatically when OpenTelemetry metric methods
are first used. It only initializes if the endpoint is configured.
"""Lazy + fork-safe initialization of OpenTelemetry metrics.

Tracks the pid that last initialized. If the current process's pid
differs from that (i.e. we're in a forked child that inherited the
parent's module state), re-initialize so the child gets its own
MeterProvider with a unique ``service.instance.id``.

Without this fork-safe init, every forked worker process shares the
parent's resource fingerprint, and the metrics backend collapses
their independent writes to a single series. With WORKERS=2 and
CONSERVER_VCON_CONCURRENCY=128, peak in-flight should reach ~256 per
pod, but the collapsed series only shows one worker's count (≤128).
"""
global meter, _otel_initialized

# Return early if already initialized or endpoint not configured
if _otel_initialized or not OTEL_EXPORTER_OTLP_ENDPOINT:
global meter, _otel_initialized_pid
current_pid = os.getpid()

# Already initialized in this process — bail.
if _otel_initialized_pid == current_pid:
return


if not OTEL_EXPORTER_OTLP_ENDPOINT:
# Remember we no-opped for this pid, so we don't keep re-checking.
_otel_initialized_pid = current_pid
return

try:
# Create resource with service name and host
# service.instance.id makes each process's series fingerprint
# distinct. Format: <hostname>-pid-<pid> — host alone collapses
# multiple processes in the same pod into one fingerprint.
instance_id = f"{host_name}-pid-{current_pid}"

resource = Resource.create({
"service.name": OTEL_SERVICE_NAME,
"host.name": host_name,
"service.instance.id": instance_id,
})

# Create OTLP gRPC exporter

otlp_exporter = OTLPMetricExporter(
endpoint=OTEL_EXPORTER_OTLP_ENDPOINT,
)

# Create metric reader with exporter

reader = PeriodicExportingMetricReader(
exporter=otlp_exporter,
export_interval_millis=OTEL_METRIC_EXPORT_INTERVAL,
)

# Create meter provider

provider = MeterProvider(
resource=resource,
metric_readers=[reader],
)

# Set global meter provider

# OTel Python's metrics.set_meter_provider may have already been
# called by the parent process. Calling it again in this child
# process replaces the global with our fork-local provider,
# which is what we want here.
metrics.set_meter_provider(provider)

# Get meter
meter = metrics.get_meter(__name__)
_otel_initialized = True

# Clear cached instrument handles inherited from the parent —
# they point at the parent's MeterProvider and exporter, neither
# of which run in this child. Recreate them lazily on next use
# against the new ``meter``.
counter_metrics.clear()
histogram_metrics.clear()
updown_counter_metrics.clear()
observable_gauges.clear()

_otel_initialized_pid = current_pid
logger.info(
"OpenTelemetry metrics initialized: pid=%d, instance_id=%s",
current_pid, instance_id,
)
except Exception as e:
# Log error but don't fail initialization
logger = logging.getLogger(__name__)
logger.warning(f"Failed to initialize OpenTelemetry metrics: {e}")
_otel_initialized = True # Mark as initialized to avoid repeated attempts
logger.warning("Failed to initialize OpenTelemetry metrics: %s", e)
# Remember we tried for this pid; avoid retrying every call.
_otel_initialized_pid = current_pid


def increment_counter(metric_name, value=1, attributes=None):
Expand Down
5 changes: 3 additions & 2 deletions common/tests/test_inflight_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
return).
"""

import os
from unittest.mock import MagicMock, patch

import pytest
Expand All @@ -27,7 +28,7 @@ def test_first_call_creates_instrument(self):
fake_meter.create_up_down_counter.return_value = fake_instr

with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \
patch.object(metrics, "_otel_initialized", True), \
patch.object(metrics, "_otel_initialized_pid", os.getpid()), \
patch.object(metrics, "meter", fake_meter):
metrics.add_updown_counter("conserver.vcons.inflight", 1,
attributes={"chain.name": "main"})
Expand All @@ -47,7 +48,7 @@ def test_second_call_reuses_instrument(self):
fake_meter.create_up_down_counter.return_value = fake_instr

with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \
patch.object(metrics, "_otel_initialized", True), \
patch.object(metrics, "_otel_initialized_pid", os.getpid()), \
patch.object(metrics, "meter", fake_meter):
metrics.add_updown_counter("conserver.vcons.inflight", 1)
metrics.add_updown_counter("conserver.vcons.inflight", -1)
Expand Down
121 changes: 121 additions & 0 deletions common/tests/test_otel_fork_safe_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Unit tests for the fork-safe ``_init_otel_metrics`` behavior in
``common/lib/metrics.py``.

The function tracks the pid that last initialized the OTel SDK in this
process. A different pid (i.e. we're in a forked child) triggers
re-initialization with a fresh ``service.instance.id``, a fresh meter,
and an emptied instrument cache.
"""

import os
from unittest.mock import MagicMock, patch

import pytest


def _reset_module_state(metrics_module):
"""Put lib.metrics back into a pristine 'not yet initialized' state
so each test runs against the same baseline."""
metrics_module._otel_initialized_pid = None
metrics_module.meter = None
metrics_module.counter_metrics.clear()
metrics_module.histogram_metrics.clear()
metrics_module.updown_counter_metrics.clear()
metrics_module.observable_gauges.clear()


class TestInitOtelMetricsForkSafe:
def test_first_call_in_a_process_initializes(self):
from lib import metrics
_reset_module_state(metrics)

with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \
patch("lib.metrics.MeterProvider") as mock_mp, \
patch("lib.metrics.metrics.set_meter_provider") as mock_set, \
patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):

metrics._init_otel_metrics()

# MeterProvider was built, set globally, and we recorded the pid.
assert mock_mp.called
assert mock_set.called
assert metrics._otel_initialized_pid == os.getpid()

def test_second_call_in_same_pid_is_a_no_op(self):
from lib import metrics
_reset_module_state(metrics)

# Mark this pid as already initialized.
metrics._otel_initialized_pid = os.getpid()

with patch("lib.metrics.MeterProvider") as mock_mp, \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):
metrics._init_otel_metrics()

# Skipped entirely — no new MeterProvider construction.
assert not mock_mp.called

def test_different_pid_triggers_reinit(self):
"""When a forked child inherits ``_otel_initialized_pid`` from
the parent, ``_init_otel_metrics`` MUST detect the pid mismatch
and rebuild the MeterProvider in the child's address space."""
from lib import metrics
_reset_module_state(metrics)

# Simulate having been initialized in a different (parent) pid.
metrics._otel_initialized_pid = os.getpid() + 1000 # any other pid
# Also seed the instrument caches as the parent would have.
metrics.counter_metrics["dummy"] = MagicMock()
metrics.updown_counter_metrics["dummy"] = MagicMock()

with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \
patch("lib.metrics.MeterProvider") as mock_mp, \
patch("lib.metrics.metrics.set_meter_provider") as mock_set, \
patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):

metrics._init_otel_metrics()

# Fresh provider built in this process.
assert mock_mp.called
# Resource passed to MeterProvider contains a service.instance.id
# built from host + this pid.
kwargs = mock_mp.call_args.kwargs
resource = kwargs["resource"]
attrs = dict(resource.attributes)
assert attrs["service.instance.id"] == f"{metrics.host_name}-pid-{os.getpid()}"
assert attrs["host.name"] == metrics.host_name

# Instrument caches inherited from the "parent" were cleared so
# they get rebuilt against the new MeterProvider.
assert metrics.counter_metrics == {}
assert metrics.updown_counter_metrics == {}

# New pid is recorded.
assert metrics._otel_initialized_pid == os.getpid()

def test_no_endpoint_recorded_as_initialized(self):
"""If OTLP endpoint is not configured we still record the pid
as initialized, so subsequent calls bail without re-checking."""
from lib import metrics
_reset_module_state(metrics)

with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", None):
metrics._init_otel_metrics()

assert metrics._otel_initialized_pid == os.getpid()
assert metrics.meter is None # never created

def test_exporter_failure_still_marks_pid_initialized(self):
"""If MeterProvider construction throws, we should still mark
this pid as 'tried' so we don't hot-loop trying every call."""
from lib import metrics
_reset_module_state(metrics)

with patch("lib.metrics.OTLPMetricExporter",
side_effect=RuntimeError("connection refused")), \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):
metrics._init_otel_metrics()

assert metrics._otel_initialized_pid == os.getpid()
10 changes: 9 additions & 1 deletion conserver/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import pytest
from unittest.mock import patch
from opentelemetry.sdk.metrics import MeterProvider
Expand All @@ -10,6 +11,11 @@ def metric_reader():

Yields the InMemoryMetricReader so tests can assert on emitted metrics.
Each test gets a fresh meter with no prior state.

Patches ``_otel_initialized_pid`` to the current pid so the
lazy-init helper bails out without trying to instantiate a real OTLP
exporter (which would clobber the in-memory meter and try to dial
a non-existent collector).
"""
reader = InMemoryMetricReader()
provider = MeterProvider(metric_readers=[reader])
Expand All @@ -18,10 +24,12 @@ def metric_reader():
with patch.multiple(
"lib.metrics",
meter=test_meter,
_otel_initialized=True,
_otel_initialized_pid=os.getpid(),
OTEL_EXPORTER_OTLP_ENDPOINT="http://test-collector:4317",
counter_metrics={},
histogram_metrics={},
updown_counter_metrics={},
observable_gauges={},
):
yield reader

Expand Down
Loading