From d5c6ca6fabbb6416cc459c0471a1ed3e375d7a95 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Wed, 20 May 2026 23:06:25 +0530 Subject: [PATCH] feat(metrics): emit conserver.vcons.inflight UpDownCounter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an OpenTelemetry UpDownCounter that tracks how many vCons are currently being processed — i.e. between BLPOP from the ingress list and the end of ``_handle_vcon``. +1 on entry, -1 in finally, so every exit path (success, early-return from ``before_processing``, chain exception → DLQ) symmetrically decrements. Attribute: ``chain.name`` only. ``vcon.uuid`` deliberately omitted to avoid the per-event cardinality problem. Also adds ``add_updown_counter`` helper to common/lib/metrics.py for this and any future UpDownCounter use, matching the lazy-init pattern of the existing ``increment_counter`` / ``record_histogram`` helpers. Co-Authored-By: Claude Opus 4.7 (1M context) --- common/lib/metrics.py | 38 +++++++ common/tests/test_inflight_counter.py | 155 ++++++++++++++++++++++++++ conserver/main.py | 9 +- 3 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 common/tests/test_inflight_counter.py diff --git a/common/lib/metrics.py b/common/lib/metrics.py index 6311384..ae6ee03 100644 --- a/common/lib/metrics.py +++ b/common/lib/metrics.py @@ -16,6 +16,7 @@ counter_metrics = {} histogram_metrics = {} observable_gauges = {} +updown_counter_metrics = {} _otel_initialized = False logger = logging.getLogger(__name__) @@ -122,6 +123,43 @@ def increment_counter(metric_name, value=1, attributes=None): logger.warning(f"Failed to publish counter metric to OpenTelemetry: {e}") +def add_updown_counter(metric_name, value=1, attributes=None): + """Add (positive or negative) to an OpenTelemetry UpDownCounter. + + Unlike ``increment_counter`` (monotonic), an UpDownCounter can go down. + Use for "currently active" or "in-flight" gauges where increment on + entry and decrement on exit is paired in a try/finally. + + Only publishes if OpenTelemetry endpoint is configured. Initializes + OpenTelemetry automatically on first call. + + Args: + metric_name: Name of the metric (e.g. ``"conserver.vcons.inflight"``) + value: Amount to add (positive or negative). Default: 1. + attributes: Dictionary of attributes/labels (default: None) + """ + _init_otel_metrics() + + if not OTEL_EXPORTER_OTLP_ENDPOINT or not meter: + return + + try: + if attributes is None: + attributes = {} + else: + attributes = attributes.copy() + + if metric_name not in updown_counter_metrics: + updown_counter_metrics[metric_name] = meter.create_up_down_counter( + name=metric_name, + description=f"UpDownCounter metric for {metric_name}", + ) + + updown_counter_metrics[metric_name].add(value, attributes=attributes) + except Exception as e: + logger.warning(f"Failed to publish up_down_counter metric to OpenTelemetry: {e}") + + def register_observable_gauge(metric_name, callback, description=None): """Register an OpenTelemetry observable gauge. diff --git a/common/tests/test_inflight_counter.py b/common/tests/test_inflight_counter.py new file mode 100644 index 0000000..f6b1ec6 --- /dev/null +++ b/common/tests/test_inflight_counter.py @@ -0,0 +1,155 @@ +"""Unit tests for ``add_updown_counter`` and the ``conserver.vcons.inflight`` +wiring in ``_handle_vcon``. + +The UpDownCounter must increment on entry to ``_handle_vcon`` and +decrement in the ``finally``, including when the chain raises (vCon +routed to DLQ) and when ``before_processing`` returns falsy (early +return). +""" + +from unittest.mock import MagicMock, patch + +import pytest + + +class TestUpDownCounterHelper: + """``add_updown_counter`` is the lazy-init OTel wrapper. Verify that + it creates the underlying instrument once and add() is called with + the right value/attributes.""" + + def test_first_call_creates_instrument(self): + from lib import metrics + # Reset module state so this test runs against a fresh meter + metrics.updown_counter_metrics.clear() + + fake_instr = MagicMock() + fake_meter = MagicMock() + fake_meter.create_up_down_counter.return_value = fake_instr + + with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \ + patch.object(metrics, "_otel_initialized", True), \ + patch.object(metrics, "meter", fake_meter): + metrics.add_updown_counter("conserver.vcons.inflight", 1, + attributes={"chain.name": "main"}) + + fake_meter.create_up_down_counter.assert_called_once_with( + name="conserver.vcons.inflight", + description="UpDownCounter metric for conserver.vcons.inflight", + ) + fake_instr.add.assert_called_once_with(1, attributes={"chain.name": "main"}) + + def test_second_call_reuses_instrument(self): + from lib import metrics + metrics.updown_counter_metrics.clear() + + fake_instr = MagicMock() + fake_meter = MagicMock() + fake_meter.create_up_down_counter.return_value = fake_instr + + with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake:4317"), \ + patch.object(metrics, "_otel_initialized", True), \ + patch.object(metrics, "meter", fake_meter): + metrics.add_updown_counter("conserver.vcons.inflight", 1) + metrics.add_updown_counter("conserver.vcons.inflight", -1) + + # Created once, added twice. + assert fake_meter.create_up_down_counter.call_count == 1 + assert fake_instr.add.call_count == 2 + # Values 1 then -1, attributes default to {} (not None). + assert fake_instr.add.call_args_list[0].args == (1,) + assert fake_instr.add.call_args_list[0].kwargs == {"attributes": {}} + assert fake_instr.add.call_args_list[1].args == (-1,) + + def test_no_op_when_endpoint_unset(self): + from lib import metrics + metrics.updown_counter_metrics.clear() + + with patch.object(metrics, "OTEL_EXPORTER_OTLP_ENDPOINT", None), \ + patch.object(metrics, "meter", None): + # Should be a silent no-op; no exception raised. + metrics.add_updown_counter("conserver.vcons.inflight", 1) + + assert "conserver.vcons.inflight" not in metrics.updown_counter_metrics + + +class TestHandleVconInflightTracking: + """``_handle_vcon`` wraps the chain run in +1/-1 updown_counter calls. + All exit paths (success, raise, early-return) must decrement.""" + + def _patches(self): + """Common patches: stub Redis-bound helpers, the hook, and the + chain-request class so we can drive ``_handle_vcon`` in isolation.""" + # We patch where the names are LOOKED UP — i.e. in conserver.main — + # not where they're defined. + return { + "retrieve_context": patch("main.retrieve_context", return_value={}), + "log_llen": patch("main.log_llen"), + "queue": patch("main.queue"), + "VconChainRequest": patch("main.VconChainRequest"), + "hook": patch("main.hook"), + } + + def _run_handle_vcon(self, before_processing_returns=True, + chain_raises=None): + from main import _handle_vcon + + with self._patches()["retrieve_context"], \ + self._patches()["log_llen"], \ + self._patches()["queue"] as mock_queue, \ + self._patches()["VconChainRequest"] as mock_req_cls, \ + self._patches()["hook"] as mock_hook, \ + patch("main.add_updown_counter") as mock_inflight: + + mock_hook.before_processing.return_value = before_processing_returns + req = MagicMock() + req.vcon_id = "vc1" + if chain_raises is not None: + req.process.side_effect = chain_raises + mock_req_cls.return_value = req + + try: + _handle_vcon( + worker_name="Worker-1", + ingress_list="transcribe", + vcon_id="vc1", + chain_details={"name": "transcription_chain", "links": []}, + ) + except Exception: + # _handle_vcon's outer try/except/finally is supposed to + # swallow chain errors, but we don't want a test crash to + # mask the inflight calls + pass + + return mock_inflight + + def test_inflight_incremented_then_decremented_on_success(self): + mock_inflight = self._run_handle_vcon(before_processing_returns=True) + + assert mock_inflight.call_count == 2 + # First call: +1 + assert mock_inflight.call_args_list[0].args[:2] == ("conserver.vcons.inflight", 1) + # Second call: -1 + assert mock_inflight.call_args_list[1].args[:2] == ("conserver.vcons.inflight", -1) + # Same chain.name attribute on both + attrs1 = mock_inflight.call_args_list[0].kwargs["attributes"] + attrs2 = mock_inflight.call_args_list[1].kwargs["attributes"] + assert attrs1 == attrs2 == {"chain.name": "transcription_chain"} + + def test_inflight_decremented_when_before_processing_returns_falsy(self): + """Early-return (license fail, etc.) must still decrement.""" + mock_inflight = self._run_handle_vcon(before_processing_returns=False) + + assert mock_inflight.call_count == 2 + assert mock_inflight.call_args_list[0].args[:2] == ("conserver.vcons.inflight", 1) + assert mock_inflight.call_args_list[1].args[:2] == ("conserver.vcons.inflight", -1) + + def test_inflight_decremented_when_chain_raises(self): + """Chain throwing must still decrement (DLQ path).""" + mock_inflight = self._run_handle_vcon( + before_processing_returns=True, + chain_raises=RuntimeError("link blew up"), + ) + + assert mock_inflight.call_count == 2 + assert mock_inflight.call_args_list[0].args[:2] == ("conserver.vcons.inflight", 1) + assert mock_inflight.call_args_list[1].args[:2] == ("conserver.vcons.inflight", -1) diff --git a/conserver/main.py b/conserver/main.py index dbc6292..be8ce96 100644 --- a/conserver/main.py +++ b/conserver/main.py @@ -38,7 +38,7 @@ from lib.queue import VconQueue from lib.tracing import init_tracing from lib.error_tracking import init_error_tracker -from lib.metrics import record_histogram, increment_counter +from lib.metrics import record_histogram, increment_counter, add_updown_counter from storage.base import Storage # OpenTelemetry trace context propagation @@ -729,6 +729,12 @@ def _handle_vcon( vcon_chain_request = VconChainRequest(chain_details, vcon_id, context) processing_error = None + # Track in-flight count via OTel UpDownCounter. +1 on entry, -1 in finally. + # Attribute by chain.name so the gauge groups load by chain. Per-vCon + # uuid is NOT included — it would explode cardinality (cf. CON-561 + # follow-up on the link histogram). + inflight_attrs = {"chain.name": chain_details["name"]} + add_updown_counter("conserver.vcons.inflight", 1, attributes=inflight_attrs) try: context = context or {} context["ingress_list"] = ingress_list @@ -750,6 +756,7 @@ def _handle_vcon( if VCON_DLQ_EXPIRY > 0: queue.set_vcon_ttl(vcon_id, VCON_DLQ_EXPIRY) finally: + add_updown_counter("conserver.vcons.inflight", -1, attributes=inflight_attrs) hook.after_processing( vcon_chain_request.vcon_id, chain_details,