diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..30194db --- /dev/null +++ b/.dockerignore @@ -0,0 +1,13 @@ +.git +.github +.tox +**/__pycache__ +**/*.pyc +.pytest_cache +.mypy_cache +htmlcov +.coverage +*.egg-info +.venv +dist +build diff --git a/README.md b/README.md index 751ec64..e1d6246 100644 --- a/README.md +++ b/README.md @@ -105,28 +105,85 @@ which supports various exporting mechanisms. Connectors are responsible for configuring the exporter of their choice; as well as adding more metrics if they chose to do so. -To do so, add these `opentelemetry-api` and `opentelemetry-sdk` packages -to the connector project. Depending on the exporter, another package such -as `opentelemetry-exporter-prometheus` (for Prometheus) is required. -The following is an example initialization code that enables a +Install the optional **telemetry** extra (see `requirements-telemetry.txt`) so +the SDK records real OpenTelemetry metrics. Without it, built-in metrics are +no-ops and the base package has no OpenTelemetry dependency: + +`pip install inorbit-edge[telemetry]` + +To export to Prometheus, the extra above includes `opentelemetry-exporter-prometheus` +and `prometheus-client`. The following is an example initialization code that enables a [Prometheus](https://prometheus.io/) HTTP endpoint, where all SDK metrics (including system metrics such as CPU usage) and any metric added by the connector can be scraped and exported to any external system (Grafana, StackDriver, etc.) -``` -from opentelemetry import metrics -from opentelemetry.exporter.prometheus import PrometheusMetricReader -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.resources import Resource +```python +from inorbit_edge.metrics import setup_prometheus_meter_provider from prometheus_client import start_http_server # ... -resource = Resource(attributes={"service.name": "my-connector"}) -# Note: Do not use "-" in the MetricsReader namefor GCP envs -metric_reader = PrometheusMetricReader("my_connector") -meter_provider = MeterProvider(metric_readers=[metric_reader], resource=resource) -metrics.set_meter_provider(meter_provider) -start_http_server(port=9464, addr="0.0.0.0") +if setup_prometheus_meter_provider( + service_name="my-connector", + service_instance_id="robot-123", + service_version="1.2.3", +): + start_http_server(port=9464, addr="0.0.0.0") +``` + +Custom metrics can use the same meter provider. Define instruments once during +module initialization, then record values where the connector does the work: + +```python +from inorbit_edge.metrics import get_meter + +meter = get_meter("my_connector") +messages_processed_counter = meter.create_counter( + "messages_processed", + unit="1", + description="Number of input messages processed by the connector", +) + + +def process_message(robot_id, message): + # ... connector-specific processing ... + messages_processed_counter.add(1, {"robot_id": robot_id}) +``` + +When exported to Prometheus with `service_name="my-connector"`, this appears as +`my_connector_messages_processed_total` with a `robot_id` label. Without the +`telemetry` extra installed, the same code is safe to run but records no data. + +For call-count metrics, the SDK also provides a decorator. This keeps the +increment close to the function being counted: + +```python +from inorbit_edge.metrics import get_meter, with_counter_metric + +meter = get_meter("my_connector") +command_handler_counter = meter.create_counter( + "command_handler_calls", + unit="1", + description="Number of command handler invocations", +) + + +@with_counter_metric(command_handler_counter, attributes={"command": "dock"}) +def handle_dock_command(command_payload): + # ... handle the command ... + return "accepted" +``` + +If attributes depend on the function arguments, pass a callable instead of a +static dictionary: + +```python +@with_counter_metric( + command_handler_counter, + attributes=lambda robot_id, command_payload: {"robot_id": robot_id}, +) +def handle_command(robot_id, command_payload): + # ... handle the command ... + return "accepted" ``` diff --git a/inorbit_edge/metrics.py b/inorbit_edge/metrics.py index 2dc1781..3e33c77 100644 --- a/inorbit_edge/metrics.py +++ b/inorbit_edge/metrics.py @@ -4,28 +4,211 @@ # can be added by connectors to monitor their own operations, following these # examples. # -# In all cases, initialization code is necessary to export these metrics. -# For example, to export metrics from a connector through a Prometheus HTTP -# endpoint, add the following to your initialization code: +# To export these metrics over a Prometheus HTTP endpoint, call +# :func:`setup_prometheus_meter_provider` once during initialization, then +# start the HTTP server with ``prometheus_client.start_http_server``. For +# example: # -# from opentelemetry import metrics -# from opentelemetry.exporter.prometheus import PrometheusMetricReader -# from opentelemetry.sdk.metrics import MeterProvider -# from opentelemetry.sdk.resources import Resource +# from inorbit_edge.metrics import setup_prometheus_meter_provider # from prometheus_client import start_http_server # -# resource = Resource(attributes={"service.name": "my-connector"}) -# # Note: Do not use "-" in the MetricsReader namefor GCP envs -# metric_reader = PrometheusMetricReader("my_connector") -# meter_provider = MeterProvider(metric_readers=[metric_reader], resource=resource) -# metrics.set_meter_provider(meter_provider) -# start_http_server(port=prometheus_port, addr=prometheus_host) +# setup_prometheus_meter_provider( +# service_name="my-connector", +# service_instance_id="robot-123", +# service_version="1.2.3", +# ) +# start_http_server(port=9090, addr="0.0.0.0") +# +# The helper below wires the pieces as follows: +# +# * OpenTelemetry API (``opentelemetry.metrics``): the stable API used by SDK +# code to create meters and instruments such as counters. +# * OpenTelemetry SDK (``MeterProvider``): the runtime implementation that +# stores metric data and feeds it to configured metric readers/exporters. +# * ``Resource``: metadata attached to all exported metrics, for example +# service name, service instance, and version. +# * ``PrometheusMetricReader``: an OTEL reader that makes collected metric data +# available to the Prometheus client registry when Prometheus scrapes. +# * ``prometheus_client.start_http_server``: not called here; the connector or +# demo starts that HTTP server to expose the registry at ``/metrics``. +# +# When the optional ``telemetry`` extra is not installed, all instruments +# become no-ops and ``setup_prometheus_meter_provider`` returns False. # import functools +import inspect +import logging +import re + +from deprecated import deprecated + +logger = logging.getLogger(__name__) + + +class _NoOpInstrument: + def add(self, *_args, **_kwargs): + pass + + def record(self, *_args, **_kwargs): + pass + + def set(self, *_args, **_kwargs): + pass + + +class _NoOpMeter: + def create_counter(self, *_args, **_kwargs): + return _NoOpInstrument() + + def create_up_down_counter(self, *_args, **_kwargs): + return _NoOpInstrument() + + def create_histogram(self, *_args, **_kwargs): + return _NoOpInstrument() + + def create_gauge(self, *_args, **_kwargs): + return _NoOpInstrument() + + def create_observable_gauge(self, *_args, **_kwargs): + return _NoOpInstrument() + + def create_observable_counter(self, *_args, **_kwargs): + return _NoOpInstrument() + + def create_observable_up_down_counter(self, *_args, **_kwargs): + return _NoOpInstrument() + + +try: + # OTEL API package: lightweight surface used by the SDK to create meters. + # Importing this alone is enough for no-export metrics, but not enough to + # expose data to Prometheus; that needs the SDK provider and reader below. + from opentelemetry import metrics as _otel_metrics + + # Re-exported for connectors that define observable instruments. + from opentelemetry.metrics import Observation # noqa: F401 + + OTEL_API_AVAILABLE = True +except ImportError: # pragma: no cover - exercised when telemetry extra is missing + OTEL_API_AVAILABLE = False + Observation = None # type: ignore[assignment] # noqa: F401 + + +try: + # PrometheusMetricReader bridges OTEL SDK metrics into prometheus-client's + # registry. MeterProvider is the SDK runtime that owns readers. Resource + # carries service metadata exported as Prometheus target_info labels. + from opentelemetry.exporter.prometheus import PrometheusMetricReader + from opentelemetry.sdk.metrics import MeterProvider as _SdkMeterProvider + from opentelemetry.sdk.resources import Resource + + PROMETHEUS_EXPORTER_AVAILABLE = True +except ImportError: # pragma: no cover + PROMETHEUS_EXPORTER_AVAILABLE = False + + +def get_meter(name): + """Return an OpenTelemetry Meter for ``name``. + + A Meter is the factory for instruments (counters, gauges, histograms). SDK + code records through instruments; exporter setup is intentionally separate + so importing the package does not force telemetry dependencies. + + When the ``telemetry`` extra is not installed, returns a no-op meter + whose instruments accept any call without raising. + """ + if OTEL_API_AVAILABLE: + return _otel_metrics.get_meter(name) + return _NoOpMeter() + + +def _sanitize_prometheus_prefix(prefix): + """Return a Prometheus-safe metric name prefix.""" + sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", prefix) + if sanitized and sanitized[0].isdigit(): + return f"_{sanitized}" + return sanitized + + +def setup_prometheus_meter_provider( + service_name, + service_instance_id, + service_version=None, + extra_resource_attributes=None, + exporter_namespace=None, +): + """Install a global OTEL MeterProvider with a Prometheus reader. + + This prepares OTEL metric collection but does not open a network port. + Call ``prometheus_client.start_http_server`` after this to serve the + Prometheus scrape endpoint. -from opentelemetry import metrics + Component roles: + * ``Resource``: service-level labels attached to all metrics. + * ``PrometheusMetricReader``: reads OTEL SDK metric data on scrape and + registers it with prometheus-client. + * ``MeterProvider``: the global OTEL SDK runtime used by meters returned + from ``get_meter``. -meter = metrics.get_meter("inorbit_edge_sdk") + OpenTelemetry permits only one provider per process; subsequent calls may + be ignored with a warning by the OTEL runtime. + + Returns True when this provider became active. Returns False when the + OpenTelemetry / Prometheus exporter dependencies are not installed (in + which case all instrument calls become no-ops), or when OpenTelemetry kept + an existing provider instead. + + Args: + service_name: OTLP ``service.name`` resource attribute. Also used as + the default Prometheus metric name prefix. + service_instance_id: OTLP ``service.instance.id`` resource attribute. + Should be unique per process on a host. + service_version: optional OTLP ``service.version``. + extra_resource_attributes: optional dict of extra Resource attributes. + exporter_namespace: optional Prometheus metric name prefix. Defaults + to ``service_name``. + + The final Prometheus prefix is sanitized by replacing Prometheus-unsafe + characters with ``_``. + """ + if not (OTEL_API_AVAILABLE and PROMETHEUS_EXPORTER_AVAILABLE): + logger.info( + "Prometheus metrics provider not configured because telemetry " + "dependencies are missing. Install the 'telemetry' extra to " + "enable metrics export." + ) + return False + + attrs = { + "service.name": service_name, + "service.instance.id": service_instance_id, + } + if service_version: + attrs["service.version"] = service_version + if extra_resource_attributes: + attrs.update(extra_resource_attributes) + + # Resource attributes are exported as target_info labels. They identify + # which process/service emitted otherwise identical metric names. + resource = Resource.create(attrs) + + # The reader translates OTEL metric data into Prometheus metric families. + # ``prefix`` namespaces metric names, e.g. calls_publish_pose_total becomes + # my_connector_calls_publish_pose_total. + prefix = _sanitize_prometheus_prefix(exporter_namespace or service_name) + reader = PrometheusMetricReader(prefix=prefix) + + # The provider owns the reader and becomes the implementation behind the + # global OTEL API. Meters created via get_meter() record through it. + provider = _SdkMeterProvider(metric_readers=[reader], resource=resource) + _otel_metrics.set_meter_provider(provider) + return _otel_metrics.get_meter_provider() is provider + + +# Module-level instruments. If telemetry is installed before this module is +# imported, these are real OTEL counters. Otherwise they are no-op counters so +# callers can use the SDK without installing or configuring OpenTelemetry. +meter = get_meter("inorbit_edge_sdk") publish_map_counter = meter.create_counter( "calls_publish_map", "1", "number of calls to publish maps" @@ -53,33 +236,83 @@ ) -def with_counter_metric(metric): +def attrs_from_self(*names): + """Build an attributes extractor for :func:`with_counter_metric` on methods. + + The returned callable reads each named attribute from the bound instance + (the first positional arg) and returns them as an OTEL attributes dict. + + Use this on instance methods to add per-call attributes that come from + the instance's own state, for example:: + + @with_counter_metric( + publish_pose_counter, attributes=attrs_from_self("robot_id") + ) + def publish_pose(self, ...): + ... + + Multiple attributes are supported:: + + attrs_from_self("robot_id", "session_id") + + Raises ``AttributeError`` at call time if any name is not an attribute of + the instance. """ - Decorator to count the number of calls to a function + + def _extract(self, *_args, **_kwargs): + return {name: getattr(self, name) for name in names} + + return _extract + + +def with_counter_metric(metric, attributes=None): + """Decorator: increment ``metric`` by 1 on every call. + + Works on sync and async functions (auto-detected). + + attributes: + * ``None`` — no per-call attributes (identical to the original behavior) + * ``dict`` — static per-call attributes + * ``callable`` — invoked with the wrapped function's ``*args, **kwargs``; + must return a dict of attributes """ + def _resolve_attrs(args, kwargs): + if attributes is None: + return {} + if callable(attributes): + return attributes(*args, **kwargs) or {} + return dict(attributes) + def decorator(func): + if inspect.iscoroutinefunction(func): + + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + metric.add(1, _resolve_attrs(args, kwargs)) + return await func(*args, **kwargs) + + return async_wrapper + @functools.wraps(func) - def wrapper_decorator(*args, **kwargs): - metric.add(1) + def sync_wrapper(*args, **kwargs): + metric.add(1, _resolve_attrs(args, kwargs)) return func(*args, **kwargs) - return wrapper_decorator + return sync_wrapper return decorator +@deprecated( + version="2.0.2", + reason=("use with_counter_metric(), which now auto-detects async functions"), +) def with_counter_metric_async(metric): - """ - Decorator to count the number of calls to a function - """ - - def decorator(func): - @functools.wraps(func) - async def wrapper_decorator(*args, **kwargs): - metric.add(1) - return await func(*args, **kwargs) + """Deprecated alias for :func:`with_counter_metric`. - return wrapper_decorator + Prefer ``@with_counter_metric(...)``, which now detects async functions + automatically. + """ - return decorator + return with_counter_metric(metric) diff --git a/inorbit_edge/robot.py b/inorbit_edge/robot.py index 61fe4fe..1ac15fa 100644 --- a/inorbit_edge/robot.py +++ b/inorbit_edge/robot.py @@ -9,6 +9,7 @@ from inorbit_edge import __version__ as inorbit_edge_version from inorbit_edge.types import Pose, SpatialTolerance from inorbit_edge.metrics import ( + attrs_from_self, with_counter_metric, publish_map_counter, publish_camera_frame_counter, @@ -888,7 +889,7 @@ def _handle_mapreq(self, msg): include_pixels=True, ) - @with_counter_metric(publish_map_counter) + @with_counter_metric(publish_map_counter, attributes=attrs_from_self("robot_id")) def publish_map( self, file, @@ -934,7 +935,9 @@ def publish_map( include_pixels=force_upload, ) - @with_counter_metric(publish_camera_frame_counter) + @with_counter_metric( + publish_camera_frame_counter, attributes=attrs_from_self("robot_id") + ) def publish_camera_frame(self, camera_id, image, width, height, ts): """Publishes a camera frame""" msg = CameraMessage() @@ -1264,7 +1267,7 @@ def publish_protobuf(self, subtopic, message, qos=0, retain=False): ) self.logger.debug("Return code: {}".format(ret)) - @with_counter_metric(publish_pose_counter) + @with_counter_metric(publish_pose_counter, attributes=attrs_from_self("robot_id")) def publish_pose(self, x, y, yaw, frame_id="map", ts=None): """Publish robot pose @@ -1322,7 +1325,10 @@ def reached_waypoint(self, waypoint: Pose, tolerance: SpatialTolerance): <= tolerance.angularRadians ) - @with_counter_metric(publish_key_values_counter) + @with_counter_metric( + publish_key_values_counter, + attributes=attrs_from_self("robot_id"), + ) def publish_key_values(self, key_values, custom_field="0", is_event=False): """Publish key value pairs @@ -1356,7 +1362,9 @@ def set_pairs(k): self.publish_protobuf(MQTT_SUBTOPIC_CUSTOM_DATA, msg) - @with_counter_metric(publish_system_stats_counter) + @with_counter_metric( + publish_system_stats_counter, attributes=attrs_from_self("robot_id") + ) def publish_system_stats( self, cpu_load_percentage=None, @@ -1389,7 +1397,10 @@ def publish_system_stats( self.publish_protobuf(MQTT_SUBTOPIC_SYSTEM_STATS, msg) - @with_counter_metric(publish_odometry_counter) + @with_counter_metric( + publish_odometry_counter, + attributes=attrs_from_self("robot_id"), + ) def publish_odometry( self, ts_start=None, @@ -1451,7 +1462,7 @@ def publish_odometry( msg.speed_available = True self.publish_protobuf(MQTT_SUBTOPIC_ODOMETRY, msg) - @with_counter_metric(publish_laser_counter) + @with_counter_metric(publish_laser_counter, attributes=attrs_from_self("robot_id")) def publish_lasers(self, x, y, yaw, ranges, frame_id="map", ts=None): """Publish an array of lasers. @@ -1504,7 +1515,6 @@ def publish_lasers(self, x, y, yaw, ranges, frame_id="map", ts=None): # Now publish all lasers self.publish_protobuf(MQTT_SUBTOPIC_POSE, msg) - @with_counter_metric(publish_laser_counter) def publish_laser(self, x, y, yaw, ranges, frame_id="map", ts=None): """Publish a single robot laser scan. @@ -1558,7 +1568,7 @@ def register_lasers(self, configs): retain=True, ) - @with_counter_metric(publish_path_counter) + @with_counter_metric(publish_path_counter, attributes=attrs_from_self("robot_id")) def publish_path( self, path_points, path_id="0", frame_id="map", ts=None, rdp_epsilon=0.001 ): diff --git a/inorbit_edge/tests/demo/Dockerfile b/inorbit_edge/tests/demo/Dockerfile new file mode 100644 index 0000000..09c5cb5 --- /dev/null +++ b/inorbit_edge/tests/demo/Dockerfile @@ -0,0 +1,36 @@ +# Build from repository root, e.g.: +# docker build -f inorbit_edge/tests/demo/Dockerfile -t inorbit-edge-sdk-demo . +# +# Run: mount this folder from the host (example, map, user_scripts). Example from repo root: +# docker run --rm -p 9464:9464 \ +# -v "$PWD/inorbit_edge/tests/demo:/demo:ro" \ +# -e INORBIT_URL=... \ +# -e INORBIT_API_URL=... \ +# -e INORBIT_API_KEY=... \ +# -e INORBIT_ACCOUNT_ID=... \ +# -e INORBIT_USE_SSL=true \ +# -e INORBIT_ROBOT_ID_PREFIX=$(hostname) \ +# inorbit-edge-sdk-demo + +FROM python:3.12-slim-bookworm + +WORKDIR /build + +COPY README.md \ + requirements.txt requirements-dev.txt \ + requirements-video.txt requirements-telemetry.txt \ + setup.py ./ + +COPY inorbit_edge/ ./inorbit_edge/ + +RUN pip install --no-cache-dir -U pip setuptools wheel && \ + pip install --no-cache-dir ".[video,telemetry]" + +# Demo assets (example.py, map.png, user_scripts/) are expected from a host mount at /demo +WORKDIR /demo + +# Default: export SDK metrics for Prometheus on 9464 (scrape /metrics on this port) +ENV INORBIT_METRICS_PORT=9464 +ENV INORBIT_METRICS_ADDR=0.0.0.0 + +CMD ["python", "example.py"] diff --git a/inorbit_edge/tests/demo/README.md b/inorbit_edge/tests/demo/README.md index b90446d..0431355 100644 --- a/inorbit_edge/tests/demo/README.md +++ b/inorbit_edge/tests/demo/README.md @@ -5,21 +5,63 @@ data to InOrbit. It also uses the InOrbit API for publishing map data (see `map. ## How to use -Export required environment variables and execute the `example.py` script. Use the `virtualenv` used on -the `CONTRIBUTING.md` guide. +From a virtual environment (see `CONTRIBUTING.md`), install the SDK with the video and telemetry extras from the +repository root, `cd` into this directory so paths such as `./user_scripts` resolve correctly, then set environment +variables and run `example.py`. ```bash +cd /path/to/edge-sdk-python +pip install -e '.[video,telemetry]' +cd inorbit_edge/tests/demo + export INORBIT_URL="https://control.inorbit.ai" export INORBIT_API_URL="https://api.inorbit.ai" export INORBIT_API_KEY="foobar123" -# Set when using InOrbit connect (make sure to update the robot keys -# in the example config first through -# https://api.inorbit.ai/docs/index.html#operation/generateRobotKey) -export INORBIT_ROBOT_CONFIG_FILE=`pwd`/robots_config_example.yaml -# Disable SSL for local development only -export INORBIT_USE_SSL="true" +export INORBIT_ACCOUNT_ID="account123" +export INORBIT_ROBOT_ID_PREFIX="$(hostname)" +# TLS to the MQTT broker defaults to on. For a local broker without TLS: INORBIT_USE_SSL=false # Optionally enable video streaming as camera "0" export INORBIT_VIDEO_URL=/dev/video0 +# Optional: Prometheus scrape endpoint for SDK internal metrics (requires [telemetry]) +export INORBIT_METRICS_PORT=9464 +export INORBIT_METRICS_ADDR=0.0.0.0 + python example.py ``` + +Robot ids are always `_edgesdk_demo_0`, `_edgesdk_demo_1`, … The prefix is `INORBIT_ROBOT_ID_PREFIX` and is mandatory. + +With `INORBIT_METRICS_PORT` set, the demo configures OpenTelemetry early in startup, then serves metrics at +`http://$INORBIT_METRICS_ADDR:$INORBIT_METRICS_PORT/metrics` (e.g. curl from the host if the port is published). + +## Run in a throwaway Docker container + +The image only installs the SDK and dependencies. **Mount the demo directory** from your checkout so `example.py`, +`map.png`, and `user_scripts/` come from the host (edit the demo without rebuilding the image). + +Build from the **repository root**: + +```bash +docker build -f inorbit_edge/tests/demo/Dockerfile -t inorbit-edge-sdk-demo . +``` + +Run from the **repository root** (adjust paths if you run from elsewhere). Publish `9464` if you want Prometheus +metrics on the host: + +```bash +docker run --rm -p 9464:9464 \ + -v "$PWD/inorbit_edge/tests/demo:/demo:ro" \ + -e INORBIT_URL=... \ + -e INORBIT_API_URL=... \ + -e INORBIT_API_KEY=... \ + -e INORBIT_ACCOUNT_ID=... \ + -e INORBIT_ROBOT_ID_PREFIX=$(hostname) \ + inorbit-edge-sdk-demo +``` + +`INORBIT_USE_SSL` defaults to **true** in the demo (required for InOrbit staging/production). Only set +`INORBIT_USE_SSL=false` if you use a local broker without TLS. + +The image sets `INORBIT_METRICS_PORT=9464` and `INORBIT_METRICS_ADDR=0.0.0.0` by default; override or unset +`INORBIT_METRICS_PORT` to disable the metrics HTTP server. diff --git a/inorbit_edge/tests/demo/example.py b/inorbit_edge/tests/demo/example.py index 3bf63b7..c37b236 100644 --- a/inorbit_edge/tests/demo/example.py +++ b/inorbit_edge/tests/demo/example.py @@ -2,13 +2,14 @@ # -*- coding: utf-8 -*- import logging -from time import sleep -from random import randint, uniform, random -from math import pi import os +import socket import sys -from math import inf +from time import sleep +from random import randint, uniform, random +from math import pi, inf +from inorbit_edge.metrics import setup_prometheus_meter_provider from inorbit_edge.robot import ( RobotSessionFactory, RobotSessionPool, @@ -17,6 +18,11 @@ ) from inorbit_edge.video import OpenCVCamera +try: + from prometheus_client import start_http_server +except ImportError: + start_http_server = None + logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", @@ -34,16 +40,18 @@ NUM_ROBOTS = 2 NUM_LASERS = 3 -ROBOT_FOOTPRINT = RobotFootprintSpec( - footprint=[ - {"x": -0.5, "y": -0.5}, - {"x": 0.3, "y": -0.5}, - {"x": 0.7, "y": 0.0}, - {"x": 0.3, "y": 0.5}, - {"x": -0.5, "y": 0.5}, - ], - radius=0.2, -) + +def _mqtt_use_ssl(): + """Use TLS for MQTT unless INORBIT_USE_SSL is false, 0, no, or off.""" + v = os.environ.get("INORBIT_USE_SSL", "true").strip().lower() + return v not in ("false", "0", "no", "off") + + +def _required_env(name): + value = os.environ.get(name, "").strip() + if not value: + raise RuntimeError(f"Environment variable {name} is required") + return value class FakeRobot: @@ -145,46 +153,90 @@ def my_command_handler(robot_id, command_name, args, options): options["result_function"]("0") -if __name__ == "__main__": - inorbit_api_endpoint = os.environ.get("INORBIT_URL") - inorbit_api_url = os.environ.get("INORBIT_API_URL") - inorbit_account_id = os.environ.get("INORBIT_ACCOUNT_ID") - inorbit_api_use_ssl = os.environ.get("INORBIT_USE_SSL") - inorbit_api_key = os.environ.get("INORBIT_API_KEY") +def _init_prometheus_metrics(): + """Serve /metrics when INORBIT_METRICS_PORT is set (pip extra telemetry).""" + port_s = os.environ.get("INORBIT_METRICS_PORT", "").strip() + if not port_s: + return + try: + port = int(port_s) + except ValueError: + logging.warning("INORBIT_METRICS_PORT is not a valid integer: %r", port_s) + return + if port <= 0: + return + + service_name = os.environ.get( + "INORBIT_METRICS_SERVICE_NAME", "inorbit-edge-sdk-demo" + ) + if ( + not setup_prometheus_meter_provider( + service_name=service_name, + service_instance_id=socket.gethostname(), + ) + or start_http_server is None + ): + logging.warning( + "INORBIT_METRICS_PORT=%s set but telemetry packages missing. " + "Use: pip install 'inorbit-edge[telemetry]'", + port_s, + ) + return + + host = os.environ.get("INORBIT_METRICS_ADDR", "0.0.0.0") + start_http_server(port=port, addr=host) + logging.info( + "OpenTelemetry metrics (Prometheus) on http://%s:%s/metrics", + host, + port, + ) + + +def main(): + _init_prometheus_metrics() - # For InOrbit Connect (https://connect.inorbit.ai/) certified robots, - # use a yaml file to define the robot_key for each robot_id. This - # file stores additional params such as robot_name, etc. - inorbit_robots_config = os.environ.get("INORBIT_ROBOT_CONFIG_FILE") + robot_footprint = RobotFootprintSpec( + footprint=[ + {"x": -0.5, "y": -0.5}, + {"x": 0.3, "y": -0.5}, + {"x": 0.7, "y": 0.0}, + {"x": 0.3, "y": 0.5}, + {"x": -0.5, "y": 0.5}, + ], + radius=0.2, + ) + + inorbit_api_endpoint = _required_env("INORBIT_URL") + inorbit_api_url = _required_env("INORBIT_API_URL") + inorbit_account_id = _required_env("INORBIT_ACCOUNT_ID") + inorbit_api_key = _required_env("INORBIT_API_KEY") # If configured stream video as if it was a robot camera video_url = os.environ.get("INORBIT_VIDEO_URL") - assert inorbit_api_endpoint, "Environment variable INORBIT_URL not specified" - assert inorbit_api_key, "Environment variable INORBIT_API_KEY not specified" - # Required for setting configurations, such as robot footprints. - assert inorbit_api_url, "Environment variable INORBIT_API_URL not specified" - assert inorbit_account_id, "Environment variable INORBIT_ACCOUNT_ID not specified" + # Robot ids are always "_edgesdk_demo_". Prefix is mandatory. + robot_id_prefix = _required_env("INORBIT_ROBOT_ID_PREFIX") + logging.info("Robot id prefix: %r", robot_id_prefix) # Create robot session factory and session pool robot_session_factory = RobotSessionFactory( endpoint=inorbit_api_endpoint, rest_api_endpoint=inorbit_api_url, api_key=inorbit_api_key, - use_ssl=inorbit_api_use_ssl == "true", + use_ssl=_mqtt_use_ssl(), account_id=inorbit_account_id, ) robot_session_factory.register_command_callback(log_command) robot_session_factory.register_command_callback(my_command_handler) robot_session_factory.register_commands_path("./user_scripts", r".*\.sh") - robot_session_pool = RobotSessionPool(robot_session_factory, inorbit_robots_config) + robot_session_pool = RobotSessionPool(robot_session_factory) # Dictionary mapping robot ID and fake robot object fake_robot_pool = dict() # Create fake robots and populate `fake_robot_pool` dictionary for i in range(NUM_ROBOTS): - cur_robot_id = "edgesdk_py_{}".format(i) + cur_robot_id = "{}_edgesdk_demo_{}".format(robot_id_prefix, i) robot_session = robot_session_pool.get_session( robot_id=cur_robot_id, robot_name=cur_robot_id ) @@ -220,8 +272,8 @@ def my_command_handler(robot_id, command_name, args, options): robot_session.register_lasers(configs) # Configure robot footprint - if ROBOT_FOOTPRINT: - robot_session.apply_footprint(ROBOT_FOOTPRINT) + if robot_footprint: + robot_session.apply_footprint(robot_footprint) # Go through every fake robot and simulate robot movement while True: @@ -265,7 +317,7 @@ def my_command_handler(robot_id, command_name, args, options): ) # Publish multiple lasers - ranges, angles = [], [] + ranges = [] for i in range(NUM_LASERS): # Generate random lidar ranges within arbitrary limits lidar = [max(LIDAR_MIN, random() * LIDAR_MAX) for _ in range(700)] @@ -285,3 +337,7 @@ def my_command_handler(robot_id, command_name, args, options): except KeyboardInterrupt: robot_session_pool.tear_down() sys.exit() + + +if __name__ == "__main__": + main() diff --git a/inorbit_edge/tests/demo/robots_config_example.yaml b/inorbit_edge/tests/demo/robots_config_example.yaml deleted file mode 100644 index 8ee1e02..0000000 --- a/inorbit_edge/tests/demo/robots_config_example.yaml +++ /dev/null @@ -1,10 +0,0 @@ -# robot_id to robot config mappings. Make sure to replace each robot_key -# with the appropriate (unique) values generated through -# https://api.inorbit.ai/docs/index.html#operation/generateRobotKey ---- -edgesdk_py_0: - robot_name: 'edgesdk_py_0' - robot_key: 'robotkey_0' -edgesdk_py_1: - robot_name: 'edgesdk_py_1' - robot_key: 'robotkey_1' diff --git a/inorbit_edge/tests/test_metrics.py b/inorbit_edge/tests/test_metrics.py new file mode 100644 index 0000000..8f85d8a --- /dev/null +++ b/inorbit_edge/tests/test_metrics.py @@ -0,0 +1,447 @@ +# SPDX-FileCopyrightText: 2026 InOrbit, Inc. +# SPDX-License-Identifier: MIT + +import asyncio +import warnings + +import pytest + +from opentelemetry.metrics import _internal as _otel_internal +from inorbit_edge import metrics as edge_metrics + + +class _RecordingCounter: + """Stand-in for a real OTEL counter that records .add() calls.""" + + def __init__(self): + self.calls = [] + + def add(self, amount, attributes=None): + self.calls.append((amount, dict(attributes) if attributes else {})) + + +def test_with_counter_metric_sync_no_attributes(): + counter = _RecordingCounter() + + @edge_metrics.with_counter_metric(counter) + def add(a, b): + return a + b + + assert add(1, 2) == 3 + assert counter.calls == [(1, {})] + + +def test_with_counter_metric_async_no_attributes(): + counter = _RecordingCounter() + + @edge_metrics.with_counter_metric(counter) + async def add(a, b): + return a + b + + result = asyncio.run(add(2, 3)) + assert result == 5 + assert counter.calls == [(1, {})] + + +def test_with_counter_metric_sync_static_attributes(): + counter = _RecordingCounter() + + @edge_metrics.with_counter_metric(counter, attributes={"endpoint": "/x"}) + def f(): + return "ok" + + f() + assert counter.calls == [(1, {"endpoint": "/x"})] + + +def test_with_counter_metric_callable_attributes_receives_args(): + counter = _RecordingCounter() + + @edge_metrics.with_counter_metric( + counter, + attributes=lambda a, b=None: {"a": str(a), "b": str(b)}, + ) + def f(a, b=None): + return a + + f(1, b=2) + assert counter.calls == [(1, {"a": "1", "b": "2"})] + + +def test_with_counter_metric_counts_even_when_wrapped_raises(): + counter = _RecordingCounter() + + @edge_metrics.with_counter_metric(counter) + def f(): + raise RuntimeError("boom") + + with pytest.raises(RuntimeError): + f() + assert counter.calls == [(1, {})] + + +def test_with_counter_metric_async_alias_emits_deprecation_warning(): + counter = _RecordingCounter() + + with warnings.catch_warnings(record=True) as captured: + warnings.simplefilter("always") + + @edge_metrics.with_counter_metric_async(counter) + async def f(): + return 1 + + asyncio.run(f()) + + assert any(issubclass(w.category, DeprecationWarning) for w in captured) + assert counter.calls == [(1, {})] + + +def test_wrapped_function_preserves_name_and_docstring(): + counter = _RecordingCounter() + + @edge_metrics.with_counter_metric(counter) + def original(x): + """original docstring.""" + return x + + assert original.__name__ == "original" + assert "original docstring" in (original.__doc__ or "") + + +def test_publish_pose_counter_receives_robot_id_attribute( + mock_mqtt_client, monkeypatch +): + """RobotSession.publish_pose adds a robot_id attribute to its counter.""" + from inorbit_edge.robot import RobotSession + + calls = [] + + def _spy_add(amount, attributes=None): + calls.append((amount, dict(attributes) if attributes else {})) + + monkeypatch.setattr(edge_metrics.publish_pose_counter, "add", _spy_add) + + session = RobotSession( + robot_id="test-robot-1", robot_name="test-robot-1", api_key="ak" + ) + session.publish_pose(x=1.0, y=2.0, yaw=0.0, frame_id="map") + + assert calls, "counter was not called" + amount, attrs = calls[0] + assert amount == 1 + assert attrs.get("robot_id") == "test-robot-1" + + +def test_publish_laser_increments_laser_counter_once(mock_mqtt_client, monkeypatch): + """publish_laser delegates to publish_lasers; count once, not on both methods.""" + from inorbit_edge.robot import RobotSession + + calls = [] + + def _spy_add(amount, attributes=None): + calls.append((amount, dict(attributes) if attributes else {})) + + monkeypatch.setattr(edge_metrics.publish_laser_counter, "add", _spy_add) + + session = RobotSession(robot_id="laser-bot", robot_name="laser-bot", api_key="ak") + session.publish_laser(0, 0, 0, [1.0, 2.0], frame_id="map") + + assert len(calls) == 1 + assert calls[0][0] == 1 + assert calls[0][1].get("robot_id") == "laser-bot" + + +@pytest.mark.parametrize( + "counter_name,method_name,method_kwargs", + [ + ( + "publish_key_values_counter", + "publish_key_values", + {"key_values": {"k": "v"}}, + ), + ( + "publish_odometry_counter", + "publish_odometry", + {"linear_distance": 1.0, "angular_distance": 0.1}, + ), + ("publish_path_counter", "publish_path", {"path_points": []}), + ], +) +def test_publish_methods_all_add_robot_id( + mock_mqtt_client, monkeypatch, counter_name, method_name, method_kwargs +): + """Each decorated publish_* method passes robot_id on its counter.""" + from inorbit_edge.robot import RobotSession + + calls = [] + monkeypatch.setattr( + getattr(edge_metrics, counter_name), + "add", + lambda amount, attributes=None: calls.append( + (amount, dict(attributes) if attributes else {}) + ), + ) + + session = RobotSession( + robot_id="fleet-bot-7", robot_name="fleet-bot-7", api_key="ak" + ) + try: + getattr(session, method_name)(**method_kwargs) + except Exception: + # We only care that the counter was called before the body runs + pass + + assert calls, f"counter {counter_name} was not called" + amount, attrs = calls[0] + assert amount == 1 + assert attrs.get("robot_id") == "fleet-bot-7" + + +# --- Tests for the public Prometheus-setup helpers ------------------------ + + +@pytest.fixture(autouse=False) +def reset_meter_provider(): + """Reset OTEL global provider state before/after the test.""" + from opentelemetry.util._once import Once + + _otel_internal._METER_PROVIDER = None + _otel_internal._PROXY_METER_PROVIDER = _otel_internal._ProxyMeterProvider() + _otel_internal._METER_PROVIDER_SET_ONCE = Once() + yield + _otel_internal._METER_PROVIDER = None + _otel_internal._PROXY_METER_PROVIDER = _otel_internal._ProxyMeterProvider() + _otel_internal._METER_PROVIDER_SET_ONCE = Once() + + +def test_get_meter_returns_real_meter_when_otel_available(): + m = edge_metrics.get_meter("inorbit_test") + # When OTEL is available we get a Meter (likely a _ProxyMeter); not the + # local _NoOpMeter sentinel. + assert m is not None + counter = m.create_counter("inorbit.test.counter") + counter.add(1) + + +def test_setup_prometheus_meter_provider_installs_resource(reset_meter_provider): + from opentelemetry import metrics as otel_metrics + from opentelemetry.sdk.metrics import MeterProvider + + installed = edge_metrics.setup_prometheus_meter_provider( + service_name="inorbit_connector", + service_instance_id="r-1", + service_version="1.0.0", + extra_resource_attributes={"site": "lab"}, + ) + assert installed is True + + provider = otel_metrics.get_meter_provider() + assert isinstance(provider, MeterProvider) + attrs = dict(provider._sdk_config.resource.attributes) + assert attrs["service.name"] == "inorbit_connector" + assert attrs["service.instance.id"] == "r-1" + assert attrs["service.version"] == "1.0.0" + assert attrs["site"] == "lab" + + +def test_setup_prometheus_meter_provider_returns_false_when_disabled( + monkeypatch, caplog +): + monkeypatch.setattr(edge_metrics, "PROMETHEUS_EXPORTER_AVAILABLE", False) + with caplog.at_level("INFO", logger=edge_metrics.__name__): + installed = edge_metrics.setup_prometheus_meter_provider( + service_name="x", service_instance_id="y" + ) + assert installed is False + assert "telemetry dependencies are missing" in caplog.text + + +def test_noop_instruments_accept_otel_kwargs(monkeypatch): + monkeypatch.setattr(edge_metrics, "OTEL_API_AVAILABLE", False) + meter = edge_metrics.get_meter("inorbit_test") + context = object() + + counter = meter.create_counter("counter") + counter.add(1, attributes={"robot_id": "r-1"}, context=context, extra=True) + + histogram = meter.create_histogram("histogram") + histogram.record(1.5, attributes={"robot_id": "r-1"}, context=context) + + gauge = meter.create_gauge("gauge") + gauge.set(2, attributes={"robot_id": "r-1"}, context=context) + + +def test_setup_prometheus_meter_provider_uses_service_name_as_prefix(monkeypatch): + captured = {} + + class _Reader: + def __init__(self, *, prefix=""): + captured["prefix"] = prefix + + class _Provider: + def __init__(self, metric_readers, resource): + captured["provider"] = self + self.metric_readers = metric_readers + self.resource = resource + + class _Resource: + @staticmethod + def create(attrs): + return attrs + + monkeypatch.setattr(edge_metrics, "OTEL_API_AVAILABLE", True) + monkeypatch.setattr(edge_metrics, "PROMETHEUS_EXPORTER_AVAILABLE", True) + monkeypatch.setattr(edge_metrics, "PrometheusMetricReader", _Reader) + monkeypatch.setattr(edge_metrics, "_SdkMeterProvider", _Provider) + monkeypatch.setattr(edge_metrics, "Resource", _Resource) + monkeypatch.setattr( + edge_metrics._otel_metrics, + "set_meter_provider", + lambda provider: captured.__setitem__("active_provider", provider), + ) + monkeypatch.setattr( + edge_metrics._otel_metrics, + "get_meter_provider", + lambda: captured["active_provider"], + ) + + installed = edge_metrics.setup_prometheus_meter_provider( + service_name="inorbit-connector", + service_instance_id="r-1", + ) + + assert installed is True + assert captured["prefix"] == "inorbit_connector" + + +def test_setup_prometheus_meter_provider_sanitizes_numeric_prefix(monkeypatch): + captured = {} + + class _Reader: + def __init__(self, *, prefix=""): + captured["prefix"] = prefix + + class _Provider: + def __init__(self, metric_readers, resource): + captured["provider"] = self + self.metric_readers = metric_readers + self.resource = resource + + class _Resource: + @staticmethod + def create(attrs): + return attrs + + monkeypatch.setattr(edge_metrics, "OTEL_API_AVAILABLE", True) + monkeypatch.setattr(edge_metrics, "PROMETHEUS_EXPORTER_AVAILABLE", True) + monkeypatch.setattr(edge_metrics, "PrometheusMetricReader", _Reader) + monkeypatch.setattr(edge_metrics, "_SdkMeterProvider", _Provider) + monkeypatch.setattr(edge_metrics, "Resource", _Resource) + monkeypatch.setattr( + edge_metrics._otel_metrics, + "set_meter_provider", + lambda provider: captured.__setitem__("active_provider", provider), + ) + monkeypatch.setattr( + edge_metrics._otel_metrics, + "get_meter_provider", + lambda: captured["active_provider"], + ) + + installed = edge_metrics.setup_prometheus_meter_provider( + service_name="123-connector", + service_instance_id="r-1", + ) + + assert installed is True + assert captured["prefix"] == "_123_connector" + + +def test_setup_prometheus_meter_provider_accepts_prefix_override(monkeypatch): + captured = {} + + class _Reader: + def __init__(self, *, prefix=""): + captured["prefix"] = prefix + + class _Provider: + def __init__(self, metric_readers, resource): + captured["provider"] = self + self.metric_readers = metric_readers + self.resource = resource + + class _Resource: + @staticmethod + def create(attrs): + return attrs + + monkeypatch.setattr(edge_metrics, "OTEL_API_AVAILABLE", True) + monkeypatch.setattr(edge_metrics, "PROMETHEUS_EXPORTER_AVAILABLE", True) + monkeypatch.setattr(edge_metrics, "PrometheusMetricReader", _Reader) + monkeypatch.setattr(edge_metrics, "_SdkMeterProvider", _Provider) + monkeypatch.setattr(edge_metrics, "Resource", _Resource) + monkeypatch.setattr( + edge_metrics._otel_metrics, + "set_meter_provider", + lambda provider: captured.__setitem__("active_provider", provider), + ) + monkeypatch.setattr( + edge_metrics._otel_metrics, + "get_meter_provider", + lambda: captured["active_provider"], + ) + + installed = edge_metrics.setup_prometheus_meter_provider( + service_name="inorbit-connector", + service_instance_id="r-1", + exporter_namespace="custom-namespace", + ) + + assert installed is True + assert captured["prefix"] == "custom_namespace" + + +def test_otel_api_available_reflects_import_status(): + # In the test environment the telemetry extra is installed, so OTEL is + # available. The flag is the source of truth for callers. + assert edge_metrics.OTEL_API_AVAILABLE is True + assert edge_metrics.Observation is not None + + +# --- Tests for attrs_from_self ------------------------------------------ + + +def test_attrs_from_self_extracts_named_attributes(): + extract = edge_metrics.attrs_from_self("robot_id", "site") + + class _Stub: + robot_id = "r-7" + site = "lab" + + assert extract(_Stub()) == {"robot_id": "r-7", "site": "lab"} + + +def test_attrs_from_self_used_with_with_counter_metric(): + counter = _RecordingCounter() + + class _Thing: + robot_id = "r-1" + + @edge_metrics.with_counter_metric( + counter, attributes=edge_metrics.attrs_from_self("robot_id") + ) + def do_work(self, _arg): + return _arg + + _Thing().do_work(42) + assert counter.calls == [(1, {"robot_id": "r-1"})] + + +def test_attrs_from_self_raises_when_attribute_missing(): + extract = edge_metrics.attrs_from_self("missing_attr") + + class _Stub: + pass + + with pytest.raises(AttributeError): + extract(_Stub()) diff --git a/requirements-telemetry.txt b/requirements-telemetry.txt new file mode 100644 index 0000000..1513fa9 --- /dev/null +++ b/requirements-telemetry.txt @@ -0,0 +1,5 @@ +opentelemetry-api~=1.41.0 +opentelemetry-sdk~=1.41.0 +# 0.x release line; must match the SDK line (see PyPI for each version’s opentelemetry-sdk pin) +opentelemetry-exporter-prometheus~=0.62b0 +prometheus-client>=0.20,<1.0 diff --git a/requirements.txt b/requirements.txt index b759047..8482288 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,3 @@ protobuf~=5.29.6 certifi>=2024.2 deprecated>=1.2,<2.0 rdp2~=1.1.2 -opentelemetry-api~=1.39.1 -opentelemetry-sdk~=1.39.1 diff --git a/setup.py b/setup.py index 1e64a8d..e1572fe 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ long_description = readme_file.read() # Load from the requirements-*.txt files where '*' is anything extra -requirements = {key: [] for key in ["dev", "install", "video"]} +requirements = {key: [] for key in ["dev", "install", "video", "telemetry"]} base_path = os.path.dirname(os.path.abspath(__file__)) for key in requirements: fname = os.path.join( @@ -51,6 +51,7 @@ download_url=f"{GITHUB_REPO}/archive/refs/tags/v2.0.2.zip", extras_require={ "video": requirements["video"], + "telemetry": requirements["telemetry"], "dev": requirements["dev"], }, install_requires=requirements["install"], diff --git a/tox.ini b/tox.ini index bc285ca..bfe0caf 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ deps = -rrequirements.txt -rrequirements-dev.txt -rrequirements-video.txt + -rrequirements-telemetry.txt commands = flake8 inorbit_edge black --check --diff inorbit_edge