Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
98fc6e5
feat(datadog-ffe): add flagevaluation EVP payload module + Cargo feat…
leoromanovsky Jun 12, 2026
89a2ba7
feat(datadog-sidecar): add FfeFlagEvaluationBatch sidecar action + EV…
leoromanovsky Jun 12, 2026
de8e987
fix(ffe): make flagevaluation batch bincode-safe over the sidecar IPC
leoromanovsky Jun 14, 2026
d02b30a
style(ffe): rustfmt + fix clippy expect-on-Option in enqueue_actions_…
leoromanovsky Jun 15, 2026
22f5642
fix(ffe): align flagevaluation sidecar delivery with worker schema
leoromanovsky Jun 16, 2026
c73a381
chore(ffe): apply flagevaluation rustfmt
leoromanovsky Jun 16, 2026
5e0ebf9
Merge remote-tracking branch 'origin/main' into leo.romanovsky/ffl-24…
leoromanovsky Jun 16, 2026
b576ea7
coalesce flagevaluation batches in sidecar
leoromanovsky Jun 17, 2026
53f81e5
fix(ffe): bound flagevaluation sidecar delivery
leoromanovsky Jun 17, 2026
fdfacd2
Merge remote-tracking branch 'origin/main' into leo.romanovsky/ffl-24…
leoromanovsky Jun 19, 2026
11ca09c
Update flagevaluation EVP endpoint contract
leoromanovsky Jun 19, 2026
3350d4b
Fix flagevaluation flusher clippy handling
leoromanovsky Jun 19, 2026
e16802b
Share FFE EVP proxy transport
leoromanovsky Jun 23, 2026
60fe825
fix(datadog-sidecar): satisfy clippy for EVP constants
leoromanovsky Jun 23, 2026
46734bc
fix(ffe): bound flagevaluation evp payloads
leoromanovsky Jun 23, 2026
2fa796b
fix(sidecar): emit flagevaluation telemetry counters
leoromanovsky Jun 23, 2026
4d38cc5
refactor(sidecar): share EVP proxy constants
leoromanovsky Jun 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions datadog-ffe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true }
prost = { version = "0.14.1", optional = true }
pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] }

[dev-dependencies]
# Matches the sidecar's bincode version. Used by the flagevaluation bincode
# round-trip test, which guards against `skip_serializing_if` reintroducing the
# worker→sidecar IPC field-misalignment bug (bincode is non-self-describing).
bincode = { version = "1.3.3" }

[features]
default = ["remote-config"]
exposure-events = ["dep:lru"]
evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"]
flagevaluation-evp = []
pyo3 = ["dep:pyo3"]
remote-config = ["dep:libdd-remote-config"]
6 changes: 5 additions & 1 deletion datadog-ffe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ mod flag_type;
#[cfg(feature = "remote-config")]
mod remote_config;
pub mod rules_based;
#[cfg(any(feature = "exposure-events", feature = "evaluation-metrics"))]
#[cfg(any(
feature = "exposure-events",
feature = "evaluation-metrics",
feature = "flagevaluation-evp"
))]
pub mod telemetry;

pub use flag_type::{ExpectedFlagType, FlagType};
448 changes: 448 additions & 0 deletions datadog-ffe/src/telemetry/flagevaluation.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions datadog-ffe/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
pub mod evaluation_metrics;
#[cfg(feature = "exposure-events")]
pub mod exposures;
#[cfg(feature = "flagevaluation-evp")]
pub mod flagevaluation;

use serde::{Deserialize, Serialize};

Expand Down
147 changes: 143 additions & 4 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ use datadog_sidecar::service::agent_info::AgentInfoReader;
use datadog_sidecar::service::telemetry::InternalTelemetryAction;
use datadog_sidecar::service::{
blocking::{self, SidecarTransport},
DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric,
FfeExposure as SidecarFfeExposure, FfeExposureBatch as SidecarFfeExposureBatch,
FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata,
SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions,
AllocationKey, ContextDD, DynamicInstrumentationConfigState, EvalError,
FfeEvaluationMetric as SidecarFfeEvaluationMetric, FfeExposure as SidecarFfeExposure,
FfeExposureBatch as SidecarFfeExposureBatch,
FfeFlagEvaluationBatch as SidecarFfeFlagEvaluationBatch,
FfeFlagEvaluationEvent as SidecarFfeFlagEvaluationEvent,
FfeTelemetryContext as SidecarFfeTelemetryContext, FlagEvalEventContext, FlagKey, InstanceId,
QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
SidecarFlushOptions, TargetingRuleKey, VariantKey,
};
use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions};
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
Expand Down Expand Up @@ -1172,6 +1176,23 @@ pub struct FfeEvaluationMetric<'a> {
pub allocation_key: CharSlice<'a>,
}

#[repr(C)]
pub struct FfeFlagEvaluation<'a> {
pub timestamp_ms: i64,
pub flag_key: CharSlice<'a>,
pub first_evaluation_ms: i64,
pub last_evaluation_ms: i64,
pub evaluation_count: u64,
pub variant: CharSlice<'a>,
pub allocation_key: CharSlice<'a>,
pub targeting_rule_key: CharSlice<'a>,
pub targeting_key: CharSlice<'a>,
/// UTF-8 JSON object. Empty, invalid, or non-object JSON is omitted.
pub evaluation_context_json: CharSlice<'a>,
pub error_message: CharSlice<'a>,
pub runtime_default_used: bool,
}

/// Send structured FFE exposure events to the sidecar. The sidecar owns
/// deduplication, JSON serialization, and Agent EVP delivery. This function is
/// caller-driven; shared libdatadog evaluator calls do not log unless an SDK
Expand Down Expand Up @@ -1243,6 +1264,78 @@ fn ddog_sidecar_send_ffe_exposure_batch_impl(
MaybeError::None
}

/// Send structured FFE flag evaluation events to the sidecar. The sidecar owns
/// JSON serialization and Agent EVP delivery. This function is caller-driven;
/// callers must aggregate and bound event cardinality before passing a batch.
///
/// # Safety
/// `context` and every element in `flag_evaluations` must contain valid UTF-8
/// `CharSlice` values. Empty `flag_evaluations` is a no-op.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_send_ffe_flag_evaluation_batch(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: &QueueId,
context: &FfeTelemetryContext<'_>,
flag_evaluations: Slice<FfeFlagEvaluation<'_>>,
) -> MaybeError {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
ddog_sidecar_send_ffe_flag_evaluation_batch_impl(
transport,
instance_id,
queue_id,
context,
flag_evaluations,
)
}))
.unwrap_or_else(|panic| {
MaybeError::Some(libdd_common_ffi::utils::handle_panic_error(
panic,
"ddog_sidecar_send_ffe_flag_evaluation_batch",
))
})
}

fn ddog_sidecar_send_ffe_flag_evaluation_batch_impl(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: &QueueId,
context: &FfeTelemetryContext<'_>,
flag_evaluations: Slice<FfeFlagEvaluation<'_>>,
) -> MaybeError {
let flag_evaluations = try_c!(flag_evaluations
.try_as_slice()
.map_err(|e| format!("Invalid flag evaluation slice: {e}")));

if flag_evaluations.is_empty() {
return MaybeError::None;
}

let context = try_c!(ffe_context_from_ffi(context));
let flag_evaluations = try_c!(flag_evaluations
.iter()
.map(|event| ffe_flag_evaluation_from_ffi(event, &context.service))
.collect::<Result<Vec<_>, _>>());

if flag_evaluations.is_empty() {
return MaybeError::None;
}

try_c!(blocking::enqueue_actions_reliable(
transport,
instance_id,
queue_id,
vec![SidecarAction::FfeFlagEvaluationBatch(
SidecarFfeFlagEvaluationBatch {
context,
flag_evaluations,
}
)],
));
MaybeError::None
}

/// Send structured FFE evaluation metric events to the sidecar. The sidecar
/// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This
/// function is caller-driven so SDKs with existing host-language hooks can
Expand Down Expand Up @@ -1307,6 +1400,37 @@ fn ffe_exposure_from_ffi(exposure: &FfeExposure<'_>) -> Result<SidecarFfeExposur
})
}

fn ffe_flag_evaluation_from_ffi(
event: &FfeFlagEvaluation<'_>,
service: &str,
) -> Result<SidecarFfeFlagEvaluationEvent, String> {
let evaluation = optional_json_object_string(event.evaluation_context_json)?;
let context = evaluation.map(|evaluation| FlagEvalEventContext {
evaluation: Some(evaluation),
dd: Some(ContextDD {
service: service.to_owned(),
}),
});

Ok(SidecarFfeFlagEvaluationEvent {
timestamp: event.timestamp_ms,
flag: FlagKey {
key: char_slice_to_string(event.flag_key)?,
},
first_evaluation: event.first_evaluation_ms,
last_evaluation: event.last_evaluation_ms,
evaluation_count: event.evaluation_count,
variant: optional_string(event.variant)?.map(|key| VariantKey { key }),
allocation: optional_string(event.allocation_key)?.map(|key| AllocationKey { key }),
targeting_rule: optional_string(event.targeting_rule_key)?
.map(|key| TargetingRuleKey { key }),
targeting_key: optional_string(event.targeting_key)?,
context,
error: optional_string(event.error_message)?.map(|message| EvalError { message }),
runtime_default_used: event.runtime_default_used,
})
}

fn ffe_metric_from_ffi(
metric: &FfeEvaluationMetric<'_>,
) -> Result<SidecarFfeEvaluationMetric, String> {
Expand All @@ -1327,6 +1451,21 @@ fn optional_string(slice: CharSlice) -> Result<Option<String>, String> {
}
}

fn optional_json_object_string(slice: CharSlice) -> Result<Option<String>, String> {
let Some(raw) = optional_string(slice)? else {
return Ok(None);
};
let value = match serde_json::from_str::<serde_json::Value>(&raw) {
Ok(value) => value,
Err(_) => return Ok(None),
};
if value.is_object() {
Ok(Some(value.to_string()))
} else {
Ok(None)
}
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
#[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" }
libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] }
libdd-remote-config = { path = "../libdd-remote-config" }
datadog-live-debugger = { path = "../datadog-live-debugger" }
datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] }
datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics", "flagevaluation-evp"] }
libdd-crashtracker = { path = "../libdd-crashtracker" }
libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" }
libdd-tinybytes = { path = "../libdd-tinybytes" }
Expand Down
65 changes: 65 additions & 0 deletions datadog-sidecar/src/self_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
// SPDX-License-Identifier: Apache-2.0
use crate::config::Config;
use crate::log;
use crate::service::ffe_flagevaluation_flusher::{
FLAG_EVALUATION_PAYLOAD_SPLITS_METRIC, FLAG_EVALUATION_REASON_CARDINALITY_CAP,
FLAG_EVALUATION_REASON_DEGRADED_CAP, FLAG_EVALUATION_REASON_PAYLOAD_LIMIT,
FLAG_EVALUATION_ROWS_DEGRADED_METRIC, FLAG_EVALUATION_ROWS_DROPPED_METRIC,
};
use crate::service::SidecarServer;
use crate::watchdog::WatchdogHandle;
use libdd_common::{tag, tag::Tag, MutexExt};
Expand Down Expand Up @@ -31,6 +36,9 @@ struct MetricData<'a> {
trace_api_bytes: ContextKey,
trace_chunks_sent: ContextKey,
trace_chunks_dropped: ContextKey,
flagevaluation_rows_dropped: ContextKey,
flagevaluation_rows_degraded: ContextKey,
flagevaluation_payload_splits: ContextKey,
}
impl MetricData<'_> {
async fn send(&self, key: ContextKey, value: f64, tags: Vec<Tag>) {
Expand All @@ -42,6 +50,7 @@ impl MetricData<'_> {

async fn collect_and_send(&self) {
let trace_metrics = self.server.trace_flusher.collect_metrics();
let flagevaluation_metrics = self.server.ffe_flagevaluation_coalescer.collect_metrics();

let submitted_payloads_delta = {
let mut counters = self.server.connection_counters.lock_or_panic();
Expand Down Expand Up @@ -154,6 +163,41 @@ impl MetricData<'_> {
],
));
}
if flagevaluation_metrics.rows_dropped_degraded_cap > 0 {
futures.push(self.send(
self.flagevaluation_rows_dropped,
flagevaluation_metrics.rows_dropped_degraded_cap as f64,
vec![tag!("reason", FLAG_EVALUATION_REASON_DEGRADED_CAP)],
));
}
if flagevaluation_metrics.rows_dropped_payload_limit > 0 {
futures.push(self.send(
self.flagevaluation_rows_dropped,
flagevaluation_metrics.rows_dropped_payload_limit as f64,
vec![tag!("reason", FLAG_EVALUATION_REASON_PAYLOAD_LIMIT)],
));
}
if flagevaluation_metrics.rows_degraded_cardinality_cap > 0 {
futures.push(self.send(
self.flagevaluation_rows_degraded,
flagevaluation_metrics.rows_degraded_cardinality_cap as f64,
vec![tag!("reason", FLAG_EVALUATION_REASON_CARDINALITY_CAP)],
));
}
if flagevaluation_metrics.rows_degraded_payload_limit > 0 {
futures.push(self.send(
self.flagevaluation_rows_degraded,
flagevaluation_metrics.rows_degraded_payload_limit as f64,
vec![tag!("reason", FLAG_EVALUATION_REASON_PAYLOAD_LIMIT)],
));
}
if flagevaluation_metrics.payload_splits > 0 {
futures.push(self.send(
self.flagevaluation_payload_splits,
flagevaluation_metrics.payload_splits as f64,
vec![],
));
}

futures::future::join_all(futures).await;
}
Expand Down Expand Up @@ -282,6 +326,27 @@ impl SelfTelemetry {
true,
MetricNamespace::Tracers,
),
flagevaluation_rows_dropped: worker.register_metric_context(
FLAG_EVALUATION_ROWS_DROPPED_METRIC.to_string(),
vec![],
MetricType::Count,
true,
MetricNamespace::Tracers,
),
flagevaluation_rows_degraded: worker.register_metric_context(
FLAG_EVALUATION_ROWS_DEGRADED_METRIC.to_string(),
vec![],
MetricType::Count,
true,
MetricNamespace::Tracers,
),
flagevaluation_payload_splits: worker.register_metric_context(
FLAG_EVALUATION_PAYLOAD_SPLITS_METRIC.to_string(),
vec![],
MetricType::Count,
true,
MetricNamespace::Tracers,
),
};

let _ = worker
Expand Down
28 changes: 27 additions & 1 deletion datadog-sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
SessionConfig, SidecarAction, SidecarFlushOptions,
};
use crate::service::sender::SidecarSender;
use crate::service::sidecar_interface::SidecarInterfaceChannel;
use crate::service::sidecar_interface::{SidecarInterfaceChannel, SidecarInterfaceRequest};
use datadog_ipc::platform::{FileBackedHandle, ShmHandle};
use datadog_ipc::SeqpacketConn;
use datadog_live_debugger::debugger_defs::DebuggerPayload;
Expand Down Expand Up @@ -222,6 +222,32 @@ pub fn enqueue_actions(
Ok(())
}

/// Reliably enqueues a list of actions to be performed.
///
/// Unlike [`enqueue_actions`], this uses the checked, blocking channel path with
/// no load-shedding and no silent drop: the `io::Result` from the send
/// propagates to the caller. On a broken pipe / connection reset /
/// not-connected error the transport reconnects and retries the exact same
/// pre-encoded request bytes once on the fresh connection.
///
/// Intended for one-shot, non-replayed payloads (for example FFE
/// flagevaluation batches) that must not be silently lost under transient
/// backpressure or a broken pipe.
pub fn enqueue_actions_reliable(
transport: &mut SidecarTransport,
instance_id: &InstanceId,
queue_id: &QueueId,
actions: Vec<SidecarAction>,
) -> io::Result<()> {
let req = SidecarInterfaceRequest::EnqueueActions {
instance_id: instance_id.clone(),
queue_id: *queue_id,
actions,
};
let data = datadog_ipc::codec::encode(&req);
transport.with_retry(|sender| sender.drain_and_send_raw_blocking(&data))
}

/// Removes the application entry for the given queue ID from the instance.
pub fn clear_queue_id(
transport: &mut SidecarTransport,
Expand Down
13 changes: 13 additions & 0 deletions datadog-sidecar/src/service/evp_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Shared EVP proxy constants for sidecar services.

/// EVP subdomain header name.
pub(crate) const SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain";

/// EVP subdomain that routes requests to event-platform intake.
pub(crate) const EVENT_PLATFORM_INTAKE_SUBDOMAIN: &str = "event-platform-intake";

/// EVP uncompressed request-body limit.
pub(crate) const PAYLOAD_SIZE_LIMIT: usize = 5 * 1024 * 1024;
Loading
Loading