Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions common/lib/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
counter_metrics = {}
histogram_metrics = {}
observable_gauges = {}
updown_counter_metrics = {}
_otel_initialized = False

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -122,6 +123,43 @@ def increment_counter(metric_name, value=1, attributes=None):
logger.warning(f"Failed to publish counter metric to OpenTelemetry: {e}")


def add_updown_counter(metric_name, value=1, attributes=None):
"""Add (positive or negative) to an OpenTelemetry UpDownCounter.

Unlike ``increment_counter`` (monotonic), an UpDownCounter can go down.
Use for "currently active" or "in-flight" gauges where increment on
entry and decrement on exit is paired in a try/finally.

Only publishes if OpenTelemetry endpoint is configured. Initializes
OpenTelemetry automatically on first call.

Args:
metric_name: Name of the metric (e.g. ``"conserver.vcons.inflight"``)
value: Amount to add (positive or negative). Default: 1.
attributes: Dictionary of attributes/labels (default: None)
"""
_init_otel_metrics()

if not OTEL_EXPORTER_OTLP_ENDPOINT or not meter:
return

try:
if attributes is None:
attributes = {}
else:
attributes = attributes.copy()

if metric_name not in updown_counter_metrics:
updown_counter_metrics[metric_name] = meter.create_up_down_counter(
name=metric_name,
description=f"UpDownCounter metric for {metric_name}",
)

updown_counter_metrics[metric_name].add(value, attributes=attributes)
except Exception as e:
logger.warning(f"Failed to publish up_down_counter metric to OpenTelemetry: {e}")


def register_observable_gauge(metric_name, callback, description=None):
"""Register an OpenTelemetry observable gauge.

Expand Down
155 changes: 155 additions & 0 deletions common/tests/test_inflight_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Unit tests for ``add_updown_counter`` and the ``conserver.vcons.inflight``
wiring in ``_handle_vcon``.

The UpDownCounter must increment on entry to ``_handle_vcon`` and
decrement in the ``finally``, including when the chain raises (vCon
routed to DLQ) and when ``before_processing`` returns falsy (early
return).
"""

from unittest.mock import MagicMock, patch

import pytest


class TestUpDownCounterHelper:
"""``add_updown_counter`` is the lazy-init OTel wrapper. Verify that
it creates the underlying instrument once and add() is called with
the right value/attributes."""

def test_first_call_creates_instrument(self):
from lib import metrics
# Reset module state so this test runs against a fresh meter
metrics.updown_counter_metrics.clear()

fake_instr = MagicMock()
fake_meter = MagicMock()
fake_meter.create_up_down_counter.return_value = fake_instr

with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \
patch.object(metrics, "_otel_initialized", True), \
patch.object(metrics, "meter", fake_meter):
metrics.add_updown_counter("conserver.vcons.inflight", 1,
attributes={"chain.name": "main"})

fake_meter.create_up_down_counter.assert_called_once_with(
name="conserver.vcons.inflight",
description="UpDownCounter metric for conserver.vcons.inflight",
)
fake_instr.add.assert_called_once_with(1, attributes={"chain.name": "main"})

def test_second_call_reuses_instrument(self):
from lib import metrics
metrics.updown_counter_metrics.clear()

fake_instr = MagicMock()
fake_meter = MagicMock()
fake_meter.create_up_down_counter.return_value = fake_instr

with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \
patch.object(metrics, "_otel_initialized", True), \
patch.object(metrics, "meter", fake_meter):
metrics.add_updown_counter("conserver.vcons.inflight", 1)
metrics.add_updown_counter("conserver.vcons.inflight", -1)

# Created once, added twice.
assert fake_meter.create_up_down_counter.call_count == 1
assert fake_instr.add.call_count == 2
# Values 1 then -1, attributes default to {} (not None).
assert fake_instr.add.call_args_list[0].args == (1,)
assert fake_instr.add.call_args_list[0].kwargs == {"attributes": {}}
assert fake_instr.add.call_args_list[1].args == (-1,)

def test_no_op_when_endpoint_unset(self):
from lib import metrics
metrics.updown_counter_metrics.clear()

with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", None), \
patch.object(metrics, "meter", None):
# Should be a silent no-op; no exception raised.
metrics.add_updown_counter("conserver.vcons.inflight", 1)

assert "conserver.vcons.inflight" not in metrics.updown_counter_metrics


class TestHandleVconInflightTracking:
"""``_handle_vcon`` wraps the chain run in +1/-1 updown_counter calls.
All exit paths (success, raise, early-return) must decrement."""

def _patches(self):
"""Common patches: stub Redis-bound helpers, the hook, and the
chain-request class so we can drive ``_handle_vcon`` in isolation."""
# We patch where the names are LOOKED UP — i.e. in conserver.main —
# not where they're defined.
return {
"retrieve_context": patch("main.retrieve_context", return_value={}),
"log_llen": patch("main.log_llen"),
"queue": patch("main.queue"),
"VconChainRequest": patch("main.VconChainRequest"),
"hook": patch("main.hook"),
}

def _run_handle_vcon(self, before_processing_returns=True,
chain_raises=None):
from main import _handle_vcon

with self._patches()["retrieve_context"], \
self._patches()["log_llen"], \
self._patches()["queue"] as mock_queue, \
self._patches()["VconChainRequest"] as mock_req_cls, \
self._patches()["hook"] as mock_hook, \
patch("main.add_updown_counter") as mock_inflight:

mock_hook.before_processing.return_value = before_processing_returns
req = MagicMock()
req.vcon_id = "vc1"
if chain_raises is not None:
req.process.side_effect = chain_raises
mock_req_cls.return_value = req

try:
_handle_vcon(
worker_name="Worker-1",
ingress_list="transcribe",
vcon_id="vc1",
chain_details={"name": "transcription_chain", "links": []},
)
except Exception:
# _handle_vcon's outer try/except/finally is supposed to
# swallow chain errors, but we don't want a test crash to
# mask the inflight calls
pass

return mock_inflight

def test_inflight_incremented_then_decremented_on_success(self):
mock_inflight = self._run_handle_vcon(before_processing_returns=True)

assert mock_inflight.call_count == 2
# First call: +1
assert mock_inflight.call_args_list[0].args[:2] == ("conserver.vcons.inflight", 1)
# Second call: -1
assert mock_inflight.call_args_list[1].args[:2] == ("conserver.vcons.inflight", -1)
# Same chain.name attribute on both
attrs1 = mock_inflight.call_args_list[0].kwargs["attributes"]
attrs2 = mock_inflight.call_args_list[1].kwargs["attributes"]
assert attrs1 == attrs2 == {"chain.name": "transcription_chain"}

def test_inflight_decremented_when_before_processing_returns_falsy(self):
"""Early-return (license fail, etc.) must still decrement."""
mock_inflight = self._run_handle_vcon(before_processing_returns=False)

assert mock_inflight.call_count == 2
assert mock_inflight.call_args_list[0].args[:2] == ("conserver.vcons.inflight", 1)
assert mock_inflight.call_args_list[1].args[:2] == ("conserver.vcons.inflight", -1)

def test_inflight_decremented_when_chain_raises(self):
"""Chain throwing must still decrement (DLQ path)."""
mock_inflight = self._run_handle_vcon(
before_processing_returns=True,
chain_raises=RuntimeError("link blew up"),
)

assert mock_inflight.call_count == 2
assert mock_inflight.call_args_list[0].args[:2] == ("conserver.vcons.inflight", 1)
assert mock_inflight.call_args_list[1].args[:2] == ("conserver.vcons.inflight", -1)
9 changes: 8 additions & 1 deletion conserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from lib.queue import VconQueue
from lib.tracing import init_tracing
from lib.error_tracking import init_error_tracker
from lib.metrics import record_histogram, increment_counter
from lib.metrics import record_histogram, increment_counter, add_updown_counter
from storage.base import Storage

# OpenTelemetry trace context propagation
Expand Down Expand Up @@ -729,6 +729,12 @@ def _handle_vcon(

vcon_chain_request = VconChainRequest(chain_details, vcon_id, context)
processing_error = None
# Track in-flight count via OTel UpDownCounter. +1 on entry, -1 in finally.
# Attribute by chain.name so the gauge groups load by chain. Per-vCon
# uuid is NOT included — it would explode cardinality (cf. CON-561
# follow-up on the link histogram).
inflight_attrs = {"chain.name": chain_details["name"]}
add_updown_counter("conserver.vcons.inflight", 1, attributes=inflight_attrs)
try:
context = context or {}
context["ingress_list"] = ingress_list
Expand All @@ -750,6 +756,7 @@ def _handle_vcon(
if VCON_DLQ_EXPIRY > 0:
queue.set_vcon_ttl(vcon_id, VCON_DLQ_EXPIRY)
finally:
add_updown_counter("conserver.vcons.inflight", -1, attributes=inflight_attrs)
hook.after_processing(
vcon_chain_request.vcon_id,
chain_details,
Expand Down
Loading