Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions common/lib/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import atexit
import logging
import os
import socket
Expand Down Expand Up @@ -94,12 +95,40 @@ def _init_otel_metrics():
metric_readers=[reader],
)

# OTel Python's metrics.set_meter_provider may have already been
# called by the parent process. Calling it again in this child
# process replaces the global with our fork-local provider,
# which is what we want here.
metrics.set_meter_provider(provider)
meter = metrics.get_meter(__name__)
# Bind ``meter`` to OUR provider directly, NOT via the global
# ``metrics.get_meter(...)`` API.
#
# Why: OTel Python's ``set_meter_provider()`` is single-call. If
# auto-instrumentation (``opentelemetry-instrumentation``) is also
# active in this process, it has typically registered its own
# MeterProvider before our lazy init runs. Subsequent
# ``set_meter_provider`` calls are silently ignored, and
# ``metrics.get_meter(__name__)`` returns the global proxy bound
# to the auto-instrumentation's provider — whose resource we don't
# control. The visible symptom: our user-provided resource
# attributes (``host.name``, ``service.instance.id``) never reach
# the metrics backend, so per-process series collapse onto one
# fingerprint at the storage layer.
#
# Binding directly via ``provider.get_meter(__name__)`` sidesteps
# the global entirely. Our ``meter`` exports through OUR provider's
# pipeline (with the correct resource); auto-instrumentation
# metrics continue to flow through their own pipeline. Both end up
# at the OTel collector independently.
meter = provider.get_meter(__name__)

# Flush our provider's metric reader on process exit. The
# auto-instrumentation MeterProvider registers its own atexit
# hook against the global; ours doesn't (we never set it
# globally), so without this, ~5s of in-buffer metrics get
# dropped each time a worker exits cleanly (e.g. during a
# rolling deploy). Worker forks inherit the parent's atexit
# registrations; each child's own ``_init_otel_metrics`` then
# adds its OWN hook for its OWN provider. The inherited
# parent-provider hook is harmless in the child — the gRPC
# state it holds was copied at fork time and shutdown() on it
# is a no-op against a duplicated state.
atexit.register(provider.shutdown)

# Clear cached instrument handles inherited from the parent —
# they point at the parent's MeterProvider and exporter, neither
Expand Down
24 changes: 20 additions & 4 deletions common/tests/test_otel_fork_safe_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,26 @@ def test_first_call_in_a_process_initializes(self):
with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \
patch("lib.metrics.MeterProvider") as mock_mp, \
patch("lib.metrics.metrics.set_meter_provider") as mock_set, \
patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \
patch("lib.metrics.atexit.register") as mock_atexit, \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):

metrics._init_otel_metrics()

# MeterProvider was built, set globally, and we recorded the pid.
# MeterProvider was built and we recorded the pid.
assert mock_mp.called
assert mock_set.called
# We must NOT touch the global meter provider — auto-instrumentation
# may have already set it, and ``set_meter_provider`` is single-call.
# See the comment in ``_init_otel_metrics`` for the rationale.
assert not mock_set.called
# Meter must come from OUR provider, not the global proxy.
provider_instance = mock_mp.return_value
assert provider_instance.get_meter.called
assert metrics.meter is provider_instance.get_meter.return_value
assert metrics._otel_initialized_pid == os.getpid()
# Provider's shutdown is registered as an atexit hook so the
# PeriodicExportingMetricReader flushes its buffer on process
# exit — otherwise rolling deploys drop the last ~5s of metrics.
mock_atexit.assert_called_once_with(provider_instance.shutdown)

def test_second_call_in_same_pid_is_a_no_op(self):
from lib import metrics
Expand Down Expand Up @@ -72,13 +83,14 @@ def test_different_pid_triggers_reinit(self):
with patch("lib.metrics.OTLPMetricExporter") as mock_exp, \
patch("lib.metrics.MeterProvider") as mock_mp, \
patch("lib.metrics.metrics.set_meter_provider") as mock_set, \
patch("lib.metrics.metrics.get_meter", return_value=MagicMock()), \
patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"):

metrics._init_otel_metrics()

# Fresh provider built in this process.
assert mock_mp.called
# Same single-call discipline — never touch the global.
assert not mock_set.called
# Resource passed to MeterProvider contains a service.instance.id
# built from host + this pid.
kwargs = mock_mp.call_args.kwargs
Expand All @@ -87,6 +99,10 @@ def test_different_pid_triggers_reinit(self):
assert attrs["service.instance.id"] == f"{metrics.host_name}-pid-{os.getpid()}"
assert attrs["host.name"] == metrics.host_name

# Meter is bound to the new provider, not the global.
provider_instance = mock_mp.return_value
assert metrics.meter is provider_instance.get_meter.return_value

# Instrument caches inherited from the "parent" were cleared so
# they get rebuilt against the new MeterProvider.
assert metrics.counter_metrics == {}
Expand Down
Loading