From 28e5fa22c7d696d16b29ac4e8d6d5c425543fe42 Mon Sep 17 00:00:00 2001 From: Marcus Date: Sun, 17 May 2026 18:53:18 -0700 Subject: [PATCH 1/2] Add cache metrics --- deployment-examples/metrics/README.md | 70 +++- .../metrics/cache-metrics-wrapper-store.md | 22 +- .../examples/stores-config.json5 | 50 +-- nativelink-config/src/stores.rs | 34 ++ nativelink-store/BUILD.bazel | 2 + nativelink-store/src/cache_metrics_store.rs | 317 ++++++++++++++++++ nativelink-store/src/default_store_factory.rs | 5 + nativelink-store/src/lib.rs | 1 + .../tests/cache_metrics_store_test.rs | 94 ++++++ nativelink-util/src/metrics.rs | 6 + .../docs/docs/deployment-examples/metrics.mdx | 58 +++- 11 files changed, 631 insertions(+), 28 deletions(-) create mode 100644 nativelink-store/src/cache_metrics_store.rs create mode 100644 nativelink-store/tests/cache_metrics_store_test.rs diff --git a/deployment-examples/metrics/README.md b/deployment-examples/metrics/README.md index 010868c5f..fcc586398 100644 --- a/deployment-examples/metrics/README.md +++ b/deployment-examples/metrics/README.md @@ -4,14 +4,24 @@ This directory contains configurations and examples for collecting, processing, ## Overview -NativeLink exposes comprehensive metrics about cache operations and remote execution through OpenTelemetry. These metrics provide insights into: +NativeLink exposes remote execution metrics through OpenTelemetry. Cache +operation metrics are available when the store is explicitly wrapped with the +opt-in `cache_metrics` store wrapper. These metrics provide insights into: -- **Cache Performance**: Hit rates, operation latencies, eviction rates +- **Cache Performance**: Hit rates and operation latencies when `cache_metrics` is enabled - **Execution Pipeline**: Queue times, stage durations, success rates - **System Health**: Worker utilization, throughput, error rates ## Quick Start +NativeLink doesn't expose a Prometheus scrape endpoint directly. It emits OTLP +metrics. To view those metrics in Prometheus, use one of these paths: + +1. NativeLink sends OTLP to an OpenTelemetry Collector, and Prometheus scrapes + the Collector's Prometheus exporter endpoint. +2. NativeLink sends OTLP/HTTP metrics directly to Prometheus with Prometheus' + OTLP receiver enabled. + ### Using Docker Compose (Recommended for Development) 1. Start the metrics stack: @@ -33,11 +43,37 @@ export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=dev,nativelink.instance_ nativelink /path/to/config.json ``` +To emit `nativelink_cache_*` metrics, wrap the CAS and/or AC store you want to +measure: + +```json5 +{ + "name": "CAS_MAIN_STORE", + "cache_metrics": { + "cache_type": "cas", + "backend": { + "filesystem": { + "content_path": "~/.cache/nativelink/content_path-cas", + "temp_path": "~/.cache/nativelink/tmp_path-cas" + } + } + } +} +``` + +If `cache_metrics` is absent, NativeLink constructs the same store graph as it +would without cache metrics. The disabled path doesn't add a wrapper, timer, +attribute allocation, or OpenTelemetry recording call to cache operations. + 4. Access the metrics: - Prometheus UI: http://localhost:9090 - Grafana: http://localhost:3000 (if included) - OTEL Collector metrics: http://localhost:8888/metrics +In this flow, NativeLink sends OTLP to the Collector on `:4317`. The Collector +serves Prometheus-format metrics on its Prometheus exporter endpoint, and +Prometheus scrapes that endpoint. + ### Using Kubernetes 1. Deploy the OTEL Collector: @@ -65,6 +101,9 @@ env: ### Cache Metrics +Cache metrics are opt-in. The following series are emitted only for stores +wrapped with `cache_metrics`; configuring OTEL alone doesn't enable them. + | Metric | Type | Description | Labels | |--------|------|-------------|--------| | `nativelink_cache_operations_total` | Counter | Total cache operations | `cache_type`, `cache_operation_name`, `cache_operation_result` | @@ -160,13 +199,38 @@ See `otel-collector-config.yaml` for a complete example. Prometheus offers native OTLP support and excellent query capabilities. -**Direct OTLP Ingestion:** +**Direct OTLP Ingestion from NativeLink:** ```bash prometheus --web.enable-otlp-receiver \ --storage.tsdb.out-of-order-time-window=30m ``` +Then point NativeLink at Prometheus' OTLP metrics endpoint: + +```bash +export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://localhost:9090/api/v1/otlp/v1/metrics +``` + **Via Collector Scraping:** + +Configure the Collector with a Prometheus exporter: + +```yaml +exporters: + prometheus: + endpoint: "0.0.0.0:9090" + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] +``` + +Then configure Prometheus to scrape the Collector: + ```yaml scrape_configs: - job_name: 'otel-collector' diff --git a/deployment-examples/metrics/cache-metrics-wrapper-store.md b/deployment-examples/metrics/cache-metrics-wrapper-store.md index f40186287..b692908ab 100644 --- a/deployment-examples/metrics/cache-metrics-wrapper-store.md +++ b/deployment-examples/metrics/cache-metrics-wrapper-store.md @@ -4,6 +4,11 @@ Expose consistent, low-cardinality cache metrics (CAS/AC/store backends) without needing to implement bespoke instrumentation inside every individual store implementation. +The implementation is intentionally opt-in through the `cache_metrics` store +wrapper. If a store isn't wrapped, NativeLink constructs the same store graph +as before and doesn't add timers, attribute allocation, or OpenTelemetry +recording calls to that store's hot path. + This document focuses on a **wrapper store** (middleware) approach that can be applied to any `StoreDriver`, and compares it with **instrumenting inside each store**. ## Problem Statement @@ -20,7 +25,7 @@ These should be queryable and composable with low cognitive overhead and consist ### A) Wrapper Store (middleware) -Wrap an existing `Arc` with a new `StoreDriver` that: +Wrap an existing `Arc` with the `cache_metrics` `StoreDriver` that: 1. Starts a timer 2. Calls the inner store method 3. Classifies the outcome (hit/miss/error/etc) @@ -142,4 +147,17 @@ To reach that: - Ensure `deployment-examples/metrics/prometheus-recording-rules.yml` references `_total` counter names. - Keep existing dashboards querying recording rules (for example, `nativelink:cache_hit_rate`) instead of raw high-cardinality series. -If wrapper metrics are **optional/config-gated**, docs may need a small note describing how to enable them; otherwise docs can remain unchanged. +Wrapper metrics are config-gated. Wrap only the logical store layer you want to +measure, for example: + +```json5 +"cache_metrics": { + "cache_type": "cas", + "backend": { + "filesystem": { + "content_path": "~/.cache/nativelink/content_path-cas", + "temp_path": "~/.cache/nativelink/tmp_path-cas" + } + } +} +``` diff --git a/nativelink-config/examples/stores-config.json5 b/nativelink-config/examples/stores-config.json5 index 9c16e9146..4c6a1b7d3 100644 --- a/nativelink-config/examples/stores-config.json5 +++ b/nativelink-config/examples/stores-config.json5 @@ -4,6 +4,18 @@ stores: [ { name: "0", + "cache_metrics": { + "cache_type": "cas", + "backend": { + "filesystem": { + "content_path": "~/.cache/nativelink/content_path-cas", + "temp_path": "~/.cache/nativelink/tmp_path-cas" + } + } + } + }, + { + name: "1", "memory": { "eviction_policy": { "max_bytes": "10mb", @@ -11,7 +23,7 @@ } }, { - name: "1", + name: "2", "experimental_cloud_object_store": { "provider": "aws", "region": "eu-north-1", @@ -26,7 +38,7 @@ } }, { - name: "2", + name: "3", "experimental_cloud_object_store": { "provider": "gcs", "bucket": "test-bucket", @@ -40,7 +52,7 @@ } }, { - name: "3", + name: "4", "experimental_cloud_object_store": { "provider": "azure", "account_name": "cloudshell1393657559", @@ -55,7 +67,7 @@ } }, { - name: "4", + name: "5", "experimental_cloud_object_store": { "provider": "ontap", "endpoint": "https://ontap-s3-endpoint:443", @@ -72,7 +84,7 @@ } }, { - name: "5", + name: "6", "ontap_s3_existence_cache": { "index_path": "/path/to/cache/index.json", "sync_interval_seconds": 300, @@ -85,7 +97,7 @@ } }, { - name: "6", + name: "7", "verify": { "backend": { "memory": { @@ -99,7 +111,7 @@ } }, { - name: "7", + name: "8", "completeness_checking": { "backend": { "filesystem": { @@ -118,7 +130,7 @@ } }, { - name: "8", + name: "9", "compression": { "compression_algorithm": { "lz4": {} @@ -135,7 +147,7 @@ } }, { - name: "9", + name: "10", "dedup": { "index_store": { "memory": { @@ -174,7 +186,7 @@ } }, { - name: "10", + name: "11", "existence_cache": { "backend": { "memory": { @@ -190,7 +202,7 @@ } }, { - name: "11", + name: "12", "fast_slow": { "fast": { "filesystem": { @@ -213,7 +225,7 @@ } }, { - name: "12", + name: "13", "shard": { "stores": [ { @@ -229,7 +241,7 @@ } }, { - name: "13", + name: "14", "filesystem": { "content_path": "/tmp/nativelink/data-worker-test/content_path-cas", "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas", @@ -239,13 +251,13 @@ } }, { - name: "14", + name: "15", "ref_store": { "name": "FS_CONTENT_STORE" } }, { - name: "15", + name: "16", "size_partitioning": { "size": "128mib", "lower_store": { @@ -262,7 +274,7 @@ } }, { - name: "16", + name: "17", "grpc": { "instance_name": "main", "endpoints": [ @@ -283,7 +295,7 @@ } }, { - name: "17", + name: "18", "redis_store": { "addresses": [ "redis://127.0.0.1:6379/", @@ -292,11 +304,11 @@ } }, { - name: "18", + name: "19", "noop": {} }, { - name: "19", + name: "20", "experimental_mongo": { "connection_string": "mongodb://localhost:27017", "database": "nativelink", diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index e752ca051..11b08ff73 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -50,6 +50,28 @@ pub enum ConfigDigestHashFunction { #[serde(rename_all = "snake_case")] #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] pub enum StoreSpec { + /// Cache metrics store wraps another store and emits low-cardinality + /// OpenTelemetry cache operation metrics for the wrapped store. + /// + /// This wrapper is opt-in. Stores that are not explicitly wrapped by + /// `cache_metrics` are constructed exactly as they are without this + /// wrapper and do not pay its hot-path timing or recording cost. + /// + /// **Example JSON Config:** + /// ```json + /// "cache_metrics": { + /// "cache_type": "cas", + /// "backend": { + /// "filesystem": { + /// "content_path": "~/.cache/nativelink/content_path-cas", + /// "temp_path": "~/.cache/nativelink/tmp_path-cas" + /// } + /// } + /// } + /// ``` + /// + CacheMetrics(Box), + /// Memory store will store all data in a hashmap in memory. /// /// **Example JSON Config:** @@ -594,6 +616,18 @@ pub struct ShardSpec { pub stores: Vec, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +#[cfg_attr(feature = "dev-schema", derive(JsonSchema))] +pub struct CacheMetricsSpec { + /// Low-cardinality cache type label for metrics, for example `cas` or `ac`. + #[serde(deserialize_with = "convert_string_with_shellexpand")] + pub cache_type: String, + + /// Store to wrap with cache operation metrics. + pub backend: StoreSpec, +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 9c9f1f251..906da8999 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -12,6 +12,7 @@ rust_library( srcs = [ "src/ac_utils.rs", "src/azure_blob_store.rs", + "src/cache_metrics_store.rs", "src/callback_utils.rs", "src/cas_utils.rs", "src/common_s3_utils.rs", @@ -116,6 +117,7 @@ rust_test_suite( srcs = [ "tests/ac_utils_test.rs", "tests/azure_blob_store_test.rs", + "tests/cache_metrics_store_test.rs", "tests/completeness_checking_store_test.rs", "tests/compression_store_test.rs", "tests/dedup_store_test.rs", diff --git a/nativelink-store/src/cache_metrics_store.rs b/nativelink-store/src/cache_metrics_store.rs new file mode 100644 index 000000000..ce43df620 --- /dev/null +++ b/nativelink-store/src/cache_metrics_store.rs @@ -0,0 +1,317 @@ +// Copyright 2026 The NativeLink Authors. All rights reserved. +// +// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// See LICENSE file for details +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::borrow::BorrowMut; +use core::ops::Bound; +use core::pin::Pin; +use std::ffi::OsString; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use bytes::Bytes; +use nativelink_config::stores::CacheMetricsSpec; +use nativelink_error::{Code, Error}; +use nativelink_metric::{ + MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, group, publish, +}; +use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; +use nativelink_util::fs; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatusIndicator}; +use nativelink_util::metrics::{CACHE_METRICS, CACHE_TYPE, CacheMetricAttrs}; +use nativelink_util::store_trait::{ + RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, UploadSizeInfo, +}; +use opentelemetry::KeyValue; + +#[derive(Debug)] +pub struct CacheMetricsStore { + backend: Store, + cache_type: String, + attrs: CacheMetricAttrs, +} + +impl CacheMetricsStore { + pub fn new(spec: &CacheMetricsSpec, backend: Store) -> Arc { + Arc::new(Self { + backend, + cache_type: spec.cache_type.clone(), + attrs: CacheMetricAttrs::new(&[KeyValue::new(CACHE_TYPE, spec.cache_type.clone())]), + }) + } + + fn duration_ms(start: Instant) -> f64 { + start.elapsed().as_secs_f64() * 1000.0 + } + + const fn size_info_bytes(size_info: UploadSizeInfo) -> Option { + match size_info { + UploadSizeInfo::ExactSize(size) => Some(size), + UploadSizeInfo::MaxSize(_) => None, + } + } + + fn record_duration(&self, start: Instant, attrs: &[KeyValue]) { + CACHE_METRICS + .cache_operation_duration + .record(Self::duration_ms(start), attrs); + } + + fn record_write_io(&self, bytes: Option) { + if let Some(bytes) = bytes { + CACHE_METRICS + .cache_io + .add(bytes, self.attrs.write_success()); + CACHE_METRICS + .cache_entry_size + .record(bytes, self.attrs.write_success()); + } + } +} + +impl MetricsComponent for CacheMetricsStore { + fn publish( + &self, + _kind: MetricKind, + _field_metadata: MetricFieldData, + ) -> Result { + publish!( + "cache_type", + &self.cache_type, + MetricKind::String, + "Low-cardinality cache type label emitted on cache metrics" + ); + let _enter = group!("backend").entered(); + self.backend + .publish(MetricKind::Component, MetricFieldData::default())?; + Ok(MetricPublishKnownKindData::Component) + } +} + +#[async_trait] +impl StoreDriver for CacheMetricsStore { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + let start = Instant::now(); + let result = self.backend.has_with_results(keys, results).await; + match &result { + Ok(()) => { + let hits = results.iter().filter(|result| result.is_some()).count(); + let misses = results.len().saturating_sub(hits); + if hits > 0 { + CACHE_METRICS + .cache_operations + .add(hits as u64, self.attrs.read_hit()); + } + if misses > 0 { + CACHE_METRICS + .cache_operations + .add(misses as u64, self.attrs.read_miss()); + } + let duration_attrs = if hits > 0 { + self.attrs.read_hit() + } else { + self.attrs.read_miss() + }; + self.record_duration(start, duration_attrs); + } + Err(_) => { + CACHE_METRICS + .cache_operations + .add(keys.len() as u64, self.attrs.read_error()); + self.record_duration(start, self.attrs.read_error()); + } + } + result + } + + async fn list( + self: Pin<&Self>, + range: (Bound>, Bound>), + handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), + ) -> Result { + self.backend + .list( + ( + range.0.map(StoreKey::into_owned), + range.1.map(StoreKey::into_owned), + ), + handler, + ) + .await + } + + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + reader: DropCloserReadHalf, + upload_size: UploadSizeInfo, + ) -> Result<(), Error> { + let start = Instant::now(); + let bytes = Self::size_info_bytes(upload_size); + let result = self.backend.update(key, reader, upload_size).await; + match &result { + Ok(()) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.write_success()); + self.record_write_io(bytes); + self.record_duration(start, self.attrs.write_success()); + } + Err(_) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.write_error()); + self.record_duration(start, self.attrs.write_error()); + } + } + result + } + + fn optimized_for(&self, optimization: StoreOptimizations) -> bool { + self.backend.optimized_for(optimization) + } + + async fn update_with_whole_file( + self: Pin<&Self>, + key: StoreKey<'_>, + path: OsString, + file: fs::FileSlot, + upload_size: UploadSizeInfo, + ) -> Result, Error> { + let start = Instant::now(); + let bytes = Self::size_info_bytes(upload_size); + let result = self + .backend + .update_with_whole_file(key, path, file, upload_size) + .await; + match &result { + Ok(_) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.write_success()); + self.record_write_io(bytes); + self.record_duration(start, self.attrs.write_success()); + } + Err(_) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.write_error()); + self.record_duration(start, self.attrs.write_error()); + } + } + result + } + + async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> { + let start = Instant::now(); + let bytes = data.len() as u64; + let result = self.backend.update_oneshot(key, data).await; + match &result { + Ok(()) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.write_success()); + self.record_write_io(Some(bytes)); + self.record_duration(start, self.attrs.write_success()); + } + Err(_) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.write_error()); + self.record_duration(start, self.attrs.write_error()); + } + } + result + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + let start = Instant::now(); + let result = self + .backend + .get_part(key, writer.borrow_mut(), offset, length) + .await; + match &result { + Ok(()) => { + CACHE_METRICS.cache_operations.add(1, self.attrs.read_hit()); + CACHE_METRICS + .cache_io + .add(writer.get_bytes_written(), self.attrs.read_hit()); + self.record_duration(start, self.attrs.read_hit()); + } + Err(err) if err.code == Code::NotFound => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.read_miss()); + self.record_duration(start, self.attrs.read_miss()); + } + Err(_) => { + CACHE_METRICS + .cache_operations + .add(1, self.attrs.read_error()); + self.record_duration(start, self.attrs.read_error()); + } + } + result + } + + fn inner_store(&self, _key: Option) -> &dyn StoreDriver { + self + } + + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn register_health(self: Arc, registry: &mut HealthRegistryBuilder) { + self.backend.clone().register_health(registry); + } + + fn register_remove_callback( + self: Arc, + callback: Arc, + ) -> Result<(), Error> { + self.backend.register_remove_callback(callback) + } +} + +#[async_trait] +impl HealthStatusIndicator for CacheMetricsStore { + fn get_name(&self) -> &'static str { + "CacheMetricsStore" + } + + async fn check_health( + &self, + namespace: std::borrow::Cow<'static, str>, + ) -> nativelink_util::health_utils::HealthStatus { + self.backend + .as_store_driver_pin() + .check_health(namespace) + .await + } +} diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 0dc11ca47..2e9bce74a 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -24,6 +24,7 @@ use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::store_trait::{Store, StoreDriver}; use crate::azure_blob_store::AzureBlobStore; +use crate::cache_metrics_store::CacheMetricsStore; use crate::completeness_checking_store::CompletenessCheckingStore; use crate::compression_store::CompressionStore; use crate::dedup_store::DedupStore; @@ -54,6 +55,10 @@ pub fn store_factory<'a>( ) -> Pin> { Box::pin(async move { let store: Arc = match backend { + StoreSpec::CacheMetrics(spec) => CacheMetricsStore::new( + spec, + store_factory(&spec.backend, store_manager, None).await?, + ), StoreSpec::Memory(spec) => MemoryStore::new(spec), StoreSpec::ExperimentalCloudObjectStore(spec) => match spec { ExperimentalCloudObjectSpec::Aws(aws_config) => { diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs index 43539d2e1..a6345f30f 100644 --- a/nativelink-store/src/lib.rs +++ b/nativelink-store/src/lib.rs @@ -14,6 +14,7 @@ pub mod ac_utils; pub mod azure_blob_store; +pub mod cache_metrics_store; pub mod callback_utils; pub mod cas_utils; pub mod common_s3_utils; diff --git a/nativelink-store/tests/cache_metrics_store_test.rs b/nativelink-store/tests/cache_metrics_store_test.rs new file mode 100644 index 000000000..fbc9e12b4 --- /dev/null +++ b/nativelink-store/tests/cache_metrics_store_test.rs @@ -0,0 +1,94 @@ +// Copyright 2026 The NativeLink Authors. All rights reserved. +// +// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// See LICENSE file for details +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use nativelink_config::stores::{CacheMetricsSpec, MemorySpec, StoreSpec}; +use nativelink_error::{Code, Error}; +use nativelink_macro::nativelink_test; +use nativelink_store::cache_metrics_store::CacheMetricsStore; +use nativelink_store::default_store_factory::store_factory; +use nativelink_store::store_manager::StoreManager; +use nativelink_util::common::DigestInfo; +use nativelink_util::store_trait::StoreLike; +use pretty_assertions::assert_eq; + +const VALID_HASH: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; +const MISSING_HASH: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef"; + +#[nativelink_test] +async fn cache_metrics_store_is_only_constructed_when_configured() -> Result<(), Error> { + let store_manager = Arc::new(StoreManager::new()); + + let plain_store = store_factory( + &StoreSpec::Memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + assert!( + plain_store + .downcast_ref::(None) + .is_none(), + "plain memory store should not be wrapped with cache metrics" + ); + + let wrapped_store = store_factory( + &StoreSpec::CacheMetrics(Box::new(CacheMetricsSpec { + cache_type: "cas".to_string(), + backend: StoreSpec::Memory(MemorySpec::default()), + })), + &store_manager, + None, + ) + .await?; + assert!( + wrapped_store + .downcast_ref::(None) + .is_some(), + "cache_metrics config should construct the metrics wrapper" + ); + + Ok(()) +} + +#[nativelink_test] +async fn cache_metrics_store_preserves_backend_store_semantics() -> Result<(), Error> { + let store_manager = Arc::new(StoreManager::new()); + let store = store_factory( + &StoreSpec::CacheMetrics(Box::new(CacheMetricsSpec { + cache_type: "cas".to_string(), + backend: StoreSpec::Memory(MemorySpec::default()), + })), + &store_manager, + None, + ) + .await?; + + let value = "cache metrics payload"; + let digest = DigestInfo::try_new(VALID_HASH, value.len())?; + store.update_oneshot(digest, value.into()).await?; + + assert_eq!(store.has(digest).await?, Some(value.len() as u64)); + assert_eq!( + store.get_part_unchunked(digest, 0, None).await?, + value.as_bytes() + ); + + let missing_digest = DigestInfo::try_new(MISSING_HASH, value.len())?; + let missing = store.get_part_unchunked(missing_digest, 0, None).await; + assert_eq!(missing.unwrap_err().code, Code::NotFound); + + Ok(()) +} diff --git a/nativelink-util/src/metrics.rs b/nativelink-util/src/metrics.rs index a4ffc616b..c04a75cc7 100644 --- a/nativelink-util/src/metrics.rs +++ b/nativelink-util/src/metrics.rs @@ -177,6 +177,7 @@ pub struct CacheMetricAttrs { read_hit: Vec, read_miss: Vec, read_expired: Vec, + read_error: Vec, // Write operation attributes write_success: Vec, @@ -210,6 +211,7 @@ impl CacheMetricAttrs { read_hit: make_attrs(CacheOperationName::Read, CacheOperationResult::Hit), read_miss: make_attrs(CacheOperationName::Read, CacheOperationResult::Miss), read_expired: make_attrs(CacheOperationName::Read, CacheOperationResult::Expired), + read_error: make_attrs(CacheOperationName::Read, CacheOperationResult::Error), write_success: make_attrs(CacheOperationName::Write, CacheOperationResult::Success), write_error: make_attrs(CacheOperationName::Write, CacheOperationResult::Error), @@ -237,6 +239,10 @@ impl CacheMetricAttrs { &self.read_expired } #[must_use] + pub fn read_error(&self) -> &[KeyValue] { + &self.read_error + } + #[must_use] pub fn write_success(&self) -> &[KeyValue] { &self.write_success } diff --git a/web/platform/src/content/docs/docs/deployment-examples/metrics.mdx b/web/platform/src/content/docs/docs/deployment-examples/metrics.mdx index 8d4a71acc..9b4614d89 100644 --- a/web/platform/src/content/docs/docs/deployment-examples/metrics.mdx +++ b/web/platform/src/content/docs/docs/deployment-examples/metrics.mdx @@ -4,18 +4,23 @@ description: 'Configure OpenTelemetry metrics collection for NativeLink' --- import { Tabs, TabItem } from '@astrojs/starlight/components'; -NativeLink provides comprehensive metrics through OpenTelemetry (OTEL), enabling deep insights into cache performance, remote execution pipelines, and system health. +NativeLink provides metrics through OpenTelemetry (OTEL), enabling insight into remote execution pipelines and system health. Cache operation metrics are available when the relevant store is explicitly wrapped with the opt-in `cache_metrics` store wrapper. ## Overview -NativeLink automatically exports metrics when configured with OTEL environment variables. The metrics cover: +NativeLink automatically exports execution metrics when configured with OTEL environment variables. Cache metrics require an explicit store wrapper. The metrics cover: -- **Cache Operations**: Hit rates, latencies, evictions +- **Cache Operations**: Hit rates and latencies when `cache_metrics` is enabled - **Execution Pipeline**: Queue depths, stage durations, success rates - **System Health**: Worker utilization, throughput, error rates ## Quick Start +NativeLink doesn't expose a Prometheus scrape endpoint directly. It emits OTLP metrics. To view those metrics in Prometheus, use one of these paths: + +1. NativeLink sends OTLP to an OpenTelemetry Collector, and Prometheus scrapes the Collector's Prometheus exporter endpoint. +2. NativeLink sends OTLP/HTTP metrics directly to Prometheus with Prometheus' OTLP receiver enabled. + @@ -37,11 +42,32 @@ export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=dev" nativelink /path/to/config.json ``` +To emit `nativelink_cache_*` metrics, wrap the CAS and/or AC store you want to measure: + +```json5 +{ + "name": "CAS_MAIN_STORE", + "cache_metrics": { + "cache_type": "cas", + "backend": { + "filesystem": { + "content_path": "~/.cache/nativelink/content_path-cas", + "temp_path": "~/.cache/nativelink/tmp_path-cas" + } + } + } +} +``` + +If `cache_metrics` is absent, NativeLink constructs the same store graph as it would without cache metrics. The disabled path doesn't add a wrapper, timer, attribute allocation, or OpenTelemetry recording call to cache operations. + Access the services: - Prometheus: http://localhost:9091 - Grafana: http://localhost:3000 (admin/admin) - OTEL Collector: http://localhost:8888/metrics +In this flow, NativeLink sends OTLP to the Collector on `:4317`. The Collector serves Prometheus-format metrics on its Prometheus exporter endpoint, and Prometheus scrapes that endpoint. + @@ -124,11 +150,35 @@ processors: send_batch_size: 1024 ``` +To make Prometheus scrape the Collector, configure a Prometheus exporter in the Collector and a matching scrape job in Prometheus: + +```yaml +# otel-collector.yaml +exporters: + prometheus: + endpoint: "0.0.0.0:9090" + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] +``` + +```yaml +# prometheus.yml +scrape_configs: + - job_name: otel-collector + static_configs: + - targets: ["otel-collector:9090"] +``` + ## Metrics Reference ### Cache Metrics -Monitor cache performance and efficiency: +Monitor cache performance and efficiency. These series are emitted only for stores wrapped with `cache_metrics`; configuring OTEL alone doesn't enable them. | Metric | Description | Key Labels | |--------|-------------|------------| From 15efde4225e36c9270d598c9ebbd7e2d1c4d3db1 Mon Sep 17 00:00:00 2001 From: Marcus Eagan Date: Mon, 18 May 2026 07:13:38 -0700 Subject: [PATCH 2/2] Update nativelink-store/src/cache_metrics_store.rs Co-authored-by: Tom Parker-Shemilt --- nativelink-store/src/cache_metrics_store.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/nativelink-store/src/cache_metrics_store.rs b/nativelink-store/src/cache_metrics_store.rs index ce43df620..aa089a93f 100644 --- a/nativelink-store/src/cache_metrics_store.rs +++ b/nativelink-store/src/cache_metrics_store.rs @@ -144,15 +144,7 @@ impl StoreDriver for CacheMetricsStore { range: (Bound>, Bound>), handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), ) -> Result { - self.backend - .list( - ( - range.0.map(StoreKey::into_owned), - range.1.map(StoreKey::into_owned), - ), - handler, - ) - .await + self.backend.list(range, handler).await } async fn update(