diff --git a/Cargo.lock b/Cargo.lock index fff06dbaa3..2c5c060d0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1353,6 +1353,7 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" name = "datadog-ffe" version = "1.0.0" dependencies = [ + "bincode", "chrono", "derive_more", "faststr", diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index b052368644..3d1c88caf0 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -30,9 +30,16 @@ libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true } prost = { version = "0.14.1", optional = true } pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } +[dev-dependencies] +# Matches the sidecar's bincode version. Used by the flagevaluation bincode +# round-trip test, which guards against `skip_serializing_if` reintroducing the +# worker→sidecar IPC field-misalignment bug (bincode is non-self-describing). +bincode = { version = "1.3.3" } + [features] default = ["remote-config"] exposure-events = ["dep:lru"] evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"] +flagevaluation-evp = [] pyo3 = ["dep:pyo3"] remote-config = ["dep:libdd-remote-config"] diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index bfa5ea72e6..ddbac1d2e7 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -6,7 +6,11 @@ mod flag_type; #[cfg(feature = "remote-config")] mod remote_config; pub mod rules_based; -#[cfg(any(feature = "exposure-events", feature = "evaluation-metrics"))] +#[cfg(any( + feature = "exposure-events", + feature = "evaluation-metrics", + feature = "flagevaluation-evp" +))] pub mod telemetry; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-ffe/src/telemetry/flagevaluation.rs b/datadog-ffe/src/telemetry/flagevaluation.rs new file mode 100644 index 0000000000..0cc6cdd9f0 --- /dev/null +++ b/datadog-ffe/src/telemetry/flagevaluation.rs @@ -0,0 +1,429 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! EVP flagevaluation payload and serialization primitives for the +//! `flageval-worker` ingestion schema. +//! +//! Crate-naming note: this workspace uses `libdd-remote-config` (not +//! `datadog-remote-config`) for the remote config crate. Downstream consumers +//! (e.g. `dd-trace-php`) must use `libdd-remote-config` in any import paths. +//! +//! Two-tier aggregation (full → degraded → drop-counted) and context pruning +//! are enforced by the caller (PHP sidecar bridge, 02-07). This module only +//! owns the payload types and serialization helpers. +//! +//! Serialization note (bincode wire vs EVP POST): these types cross the +//! worker→sidecar IPC boundary, which is encoded with **bincode** — a +//! non-self-describing format whose derived `Deserialize` reads every field in +//! declaration order. `#[serde(skip_serializing_if = ...)]` is therefore +//! **incompatible** with the bincode wire: a skipped field is omitted on +//! serialize but still expected on deserialize, causing field misalignment and +//! a dropped batch. For that reason **all fields here are always serialized** +//! (no `skip_serializing_if`). The flageval-worker EVP schema rejects null / +//! empty placeholders (especially for degraded-tier events), so the sidecar +//! flusher (`ffe_flagevaluation_flusher::build_payload`) strips null / empty +//! placeholder entries from the JSON before the HTTP POST, reproducing the old +//! skip semantics only on the outbound wire. `#[serde(default)]` is kept on +//! fields that have it for deserialize robustness. + +use super::FfeTelemetryContext; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +// ── Aggregation caps (frozen contract §1) ──────────────────────────────────── +/// Maximum number of distinct full-tier buckets across all flags. +pub const GLOBAL_CAP: usize = 131_072; +/// Maximum number of full-tier buckets for a single flag. +pub const PER_FLAG_CAP: usize = 10_000; +/// Maximum number of distinct degraded-tier buckets across all flags. +pub const DEGRADED_CAP: usize = 32_768; + +// ── Context pruning bounds (reviewer concern #1 review:4477935835) ──────────── +/// Maximum number of context fields to include in a full-tier event. +pub const MAX_CONTEXT_FIELDS: usize = 256; +/// Maximum byte length of a context field value string. Values exceeding this +/// are skipped entirely (not truncated) to avoid partial-data misattribution. +pub const MAX_FIELD_LENGTH: usize = 256; + +// ── Top-level batch ────────────────────────────────────────────────────────── + +/// Batch wrapper for EVP flagevaluation events. +/// +/// Serializes to: +/// ```json +/// { "context": { "service": "…", "env": "…", "version": "…" }, +/// "flagEvaluations": [ … ] } +/// ``` +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeFlagEvaluationBatch { + pub context: FfeTelemetryContext, + #[serde(rename = "flagEvaluations")] + pub flag_evaluations: Vec, +} + +// ── Per-event payload ──────────────────────────────────────────────────────── + +/// A single aggregated flag evaluation event. +/// +/// **All fields are always serialized** (no `skip_serializing_if`) so the type +/// is safe over the non-self-describing bincode IPC wire (see the module-level +/// serialization note). The degraded tier therefore serializes optional fields +/// as `null`/`false` on the wire; the sidecar flusher +/// (`ffe_flagevaluation_flusher::build_payload`) strips those null/empty +/// placeholders before the EVP POST so the flageval-worker schema sees no null +/// placeholders (reviewer concern #2 review:4477935835). +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeFlagEvaluationEvent { + /// Unix timestamp of the aggregation window (milliseconds). + pub timestamp: i64, + /// Required: the flag key. + pub flag: FlagKey, + /// Earliest evaluation in this bucket (milliseconds since epoch). + pub first_evaluation: i64, + /// Latest evaluation in this bucket (milliseconds since epoch). + pub last_evaluation: i64, + /// Number of evaluations folded into this bucket. + pub evaluation_count: u64, + + // Optional fields — present in the full tier, `None` in the degraded tier. + // Serialized as `null` on the bincode wire; the flusher strips them. + /// Variant key; absent when the evaluation returned the runtime default + /// (no variant assigned). + #[serde(default)] + pub variant: Option, + /// Allocation key from the UFC rule that produced this evaluation. + #[serde(default)] + pub allocation: Option, + /// Targeting rule key from UFC metadata. Omit when no real rule metadata exists. + #[serde(default)] + pub targeting_rule: Option, + /// Targeting key identifying the evaluation subject. + #[serde(default)] + pub targeting_key: Option, + /// Pruned evaluation context (≤256 fields, values ≤256 chars, skip-not-truncate). + #[serde(default)] + pub context: Option, + /// Evaluation error, if any. + #[serde(default)] + pub error: Option, + + // Optional field — may appear in either tier. + /// `true` when the evaluation returned the SDK runtime default (absent + /// variant, not a UFC-assigned variant). Serialized as `false` on the wire + /// when unset; the flusher strips the `false` placeholder before the POST. + /// `#[serde(default)]` keeps deserialization robust when the field is absent. + #[serde(default)] + pub runtime_default_used: bool, +} + +// ── Field sub-types ────────────────────────────────────────────────────────── + +/// Holds the flag key for the `flag` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FlagKey { + pub key: String, +} + +/// Holds the variant key for the `variant` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct VariantKey { + pub key: String, +} + +/// Holds the allocation key for the `allocation` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct AllocationKey { + pub key: String, +} + +/// Holds the targeting rule key for the `targeting_rule` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct TargetingRuleKey { + pub key: String, +} + +/// Holds the error message for the `error` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct EvalError { + pub message: String, +} + +/// Per-event context object. +/// +/// `evaluation` carries the pruned context attributes; `dd.service` carries the +/// originating service name for cross-service attribution. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FlagEvalEventContext { + /// Pruned evaluation context attributes (≤256 fields, values ≤256 chars), + /// carried over the wire as a **JSON-object string** (e.g. `{"plan":"premium"}`). + /// + /// The sidecar IPC codec is bincode, which cannot (de)serialize + /// `serde_json::Value` (it relies on `deserialize_any`, which bincode + /// rejects). To keep the bincode wire encodable, the pruned context is + /// stringified at event-build time and re-expanded into a JSON object by the + /// sidecar flusher (`ffe_flagevaluation_flusher::build_payload`) before the + /// EVP POST, so the on-the-wire EVP schema (`context.evaluation` as an + /// object) is unchanged. `Eq` is preserved because `String` is `Eq`. + /// + /// Always serialized (no `skip_serializing_if`) for bincode-wire safety; + /// the sidecar flusher strips it when `None` → `null`. + #[serde(default)] + pub evaluation: Option, + /// Datadog-specific context sub-object. Always serialized for bincode-wire + /// safety; the flusher strips it when `None` → `null` (and recursively + /// removes the enclosing `context` object if it becomes empty). + #[serde(default)] + pub dd: Option, +} + +/// Datadog-specific context fields inside the per-event `context.dd` object. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ContextDD { + /// Originating service name. Always serialized for bincode-wire safety; the + /// flusher strips it when empty (`""`), reproducing the old + /// `skip_serializing_if = "String::is_empty"` semantics on the POST. + #[serde(default)] + pub service: String, +} + +// ── Context pruning ────────────────────────────────────────────────────────── + +/// Prune evaluation context attributes to satisfy the frozen contract bounds: +/// - At most `MAX_CONTEXT_FIELDS` (256) entries are kept. +/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** (not truncated) to +/// avoid partial-data misattribution. +/// - Non-string values (bool, number, null) are kept regardless of their display length. +/// - Keys are iterated in sorted order for deterministic canonical-key stability; the returned +/// `BTreeMap` preserves that order. +/// +/// This satisfies reviewer concern #1 (`review:4477935835`). +pub fn prune_context( + attrs: &BTreeMap, +) -> BTreeMap { + attrs + .iter() + .filter(|(_, v)| { + // Skip string values that exceed the per-field byte limit. + if let serde_json::Value::String(s) = v { + s.len() <= MAX_FIELD_LENGTH + } else { + true + } + }) + .take(MAX_CONTEXT_FIELDS) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::{json, Value}; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn full_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 42, + variant: Some(VariantKey { + key: "on".to_owned(), + }), + allocation: Some(AllocationKey { + key: "alloc-a".to_owned(), + }), + targeting_key: Some("user-123".to_owned()), + targeting_rule: None, + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), json!("premium")); + m + }) + .unwrap(), + ), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }), + error: None, + runtime_default_used: false, + } + } + + // ── Test: required fields present in serialized JSON ────────────────────── + + #[test] + fn fully_populated_event_serializes_required_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let json = serde_json::to_string(&batch).unwrap(); + let v: Value = serde_json::from_str(&json).unwrap(); + + assert_eq!(v["context"]["service"], "svc"); + assert_eq!(v["context"]["env"], "prod"); + assert_eq!(v["context"]["version"], "1"); + + let ev = &v["flagEvaluations"][0]; + assert_eq!(ev["flag"]["key"], "my-flag"); + assert!(ev["first_evaluation"].is_number()); + assert!(ev["last_evaluation"].is_number()); + assert_eq!(ev["evaluation_count"], 42); + assert_eq!(ev["variant"]["key"], "on"); + assert_eq!(ev["allocation"]["key"], "alloc-a"); + assert_eq!(ev["targeting_key"], "user-123"); + } + + fn degraded_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "flag-b".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 7, + variant: None, + allocation: None, + targeting_rule: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + } + } + + // ── Test: degraded-tier event serializes optional fields as null ────────── + // + // The type does not use `skip_serializing_if` (bincode-wire safety), so on + // the wire `None`/`false` optional fields ARE present (as null/false). The + // null-placeholder stripping that the flageval-worker schema requires + // happens in the sidecar flusher's `build_payload`, not at the type level. + + #[test] + fn degraded_tier_event_serializes_optional_fields_as_null() { + let degraded = degraded_event(); + let json = serde_json::to_string(°raded).unwrap(); + let v: Value = serde_json::from_str(&json).unwrap(); + + // Required fields present. + assert_eq!(v["flag"]["key"], "flag-b"); + assert!(v["first_evaluation"].is_number()); + assert!(v["last_evaluation"].is_number()); + assert_eq!(v["evaluation_count"], 7); + + // Optional fields are present as null/false placeholders on the wire + // (stripped later by the flusher, not at the type level). + assert!(v["variant"].is_null(), "variant should serialize as null"); + assert!( + v["allocation"].is_null(), + "allocation should serialize as null" + ); + assert!( + v["targeting_rule"].is_null(), + "targeting_rule should serialize as null" + ); + assert!( + v["targeting_key"].is_null(), + "targeting_key should serialize as null" + ); + assert!(v["context"].is_null(), "context should serialize as null"); + assert!(v["error"].is_null(), "error should serialize as null"); + assert_eq!( + v["runtime_default_used"], false, + "runtime_default_used should serialize as false" + ); + } + + // ── Test: bincode round-trip with mixed Some/None fields ────────────────── + // + // Mechanical guard for the worker→sidecar IPC bug: bincode is a + // non-self-describing codec, so any `skip_serializing_if` on these types + // would omit a field on serialize while derived Deserialize still expects it + // in order → field misalignment → the sidecar drops the batch. A batch + // mixing a full-tier event (Some fields) and degraded-tier event (None + // fields) must survive serialize→deserialize byte-for-byte. + + #[test] + fn batch_round_trips_via_bincode_with_mixed_optional_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event(), degraded_event()], + }; + let bytes = bincode::serialize(&batch).expect("bincode serialize must succeed"); + let decoded: FfeFlagEvaluationBatch = + bincode::deserialize(&bytes).expect("bincode deserialize must succeed"); + assert_eq!( + batch, decoded, + "bincode round-trip must be lossless for a batch mixing Some and None fields" + ); + } + + // ── Test: context pruning — 256-field limit ─────────────────────────────── + + #[test] + fn context_pruning_keeps_at_most_256_fields() { + let mut attrs = BTreeMap::new(); + for i in 0..300usize { + attrs.insert(format!("key{i:04}"), json!(i.to_string())); + } + let pruned = prune_context(&attrs); + assert_eq!( + pruned.len(), + MAX_CONTEXT_FIELDS, + "pruned context must have at most {MAX_CONTEXT_FIELDS} fields" + ); + } + + // ── Test: context pruning — skip string values > 256 chars ─────────────── + + #[test] + fn context_pruning_skips_oversized_string_values() { + let mut attrs = BTreeMap::new(); + let long_value = "x".repeat(MAX_FIELD_LENGTH + 1); + attrs.insert("oversized".to_owned(), json!(long_value)); + attrs.insert("ok".to_owned(), json!("short")); + // Non-string values are kept regardless of length. + attrs.insert("num".to_owned(), json!(12345)); + + let pruned = prune_context(&attrs); + assert!( + !pruned.contains_key("oversized"), + "oversized string value must be skipped" + ); + assert!(pruned.contains_key("ok"), "short string value must be kept"); + assert!( + pruned.contains_key("num"), + "numeric value must be kept regardless of length" + ); + } + + // ── Test: batch round-trips via serde ──────────────────────────────────── + + #[test] + fn batch_round_trips_via_serde() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let json = serde_json::to_string(&batch).unwrap(); + let decoded: FfeFlagEvaluationBatch = serde_json::from_str(&json).unwrap(); + assert_eq!(batch, decoded); + } +} diff --git a/datadog-ffe/src/telemetry/mod.rs b/datadog-ffe/src/telemetry/mod.rs index 6b5e851025..4fad83f38c 100644 --- a/datadog-ffe/src/telemetry/mod.rs +++ b/datadog-ffe/src/telemetry/mod.rs @@ -5,6 +5,8 @@ pub mod evaluation_metrics; #[cfg(feature = "exposure-events")] pub mod exposures; +#[cfg(feature = "flagevaluation-evp")] +pub mod flagevaluation; use serde::{Deserialize, Serialize}; diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index f3c5c97254..4063c3ccaf 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -25,10 +25,14 @@ use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, - DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric, - FfeExposure as SidecarFfeExposure, FfeExposureBatch as SidecarFfeExposureBatch, - FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata, - SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions, + AllocationKey, ContextDD, DynamicInstrumentationConfigState, EvalError, + FfeEvaluationMetric as SidecarFfeEvaluationMetric, FfeExposure as SidecarFfeExposure, + FfeExposureBatch as SidecarFfeExposureBatch, + FfeFlagEvaluationBatch as SidecarFfeFlagEvaluationBatch, + FfeFlagEvaluationEvent as SidecarFfeFlagEvaluationEvent, + FfeTelemetryContext as SidecarFfeTelemetryContext, FlagEvalEventContext, FlagKey, InstanceId, + QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, + SidecarFlushOptions, TargetingRuleKey, VariantKey, }; use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions}; use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader}; @@ -1176,6 +1180,23 @@ pub struct FfeEvaluationMetric<'a> { pub allocation_key: CharSlice<'a>, } +#[repr(C)] +pub struct FfeFlagEvaluation<'a> { + pub timestamp_ms: i64, + pub flag_key: CharSlice<'a>, + pub first_evaluation_ms: i64, + pub last_evaluation_ms: i64, + pub evaluation_count: u64, + pub variant: CharSlice<'a>, + pub allocation_key: CharSlice<'a>, + pub targeting_rule_key: CharSlice<'a>, + pub targeting_key: CharSlice<'a>, + /// UTF-8 JSON object. Empty, invalid, or non-object JSON is omitted. + pub evaluation_context_json: CharSlice<'a>, + pub error_message: CharSlice<'a>, + pub runtime_default_used: bool, +} + /// Send structured FFE exposure events to the sidecar. The sidecar owns /// deduplication, JSON serialization, and Agent EVP delivery. This function is /// caller-driven; shared libdatadog evaluator calls do not log unless an SDK @@ -1247,6 +1268,78 @@ fn ddog_sidecar_send_ffe_exposure_batch_impl( MaybeError::None } +/// Send structured FFE flag evaluation events to the sidecar. The sidecar owns +/// JSON serialization and Agent EVP delivery. This function is caller-driven; +/// callers must aggregate and bound event cardinality before passing a batch. +/// +/// # Safety +/// `context` and every element in `flag_evaluations` must contain valid UTF-8 +/// `CharSlice` values. Empty `flag_evaluations` is a no-op. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_ffe_flag_evaluation_batch( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + context: &FfeTelemetryContext<'_>, + flag_evaluations: Slice>, +) -> MaybeError { + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + ddog_sidecar_send_ffe_flag_evaluation_batch_impl( + transport, + instance_id, + queue_id, + context, + flag_evaluations, + ) + })) + .unwrap_or_else(|panic| { + MaybeError::Some(libdd_common_ffi::utils::handle_panic_error( + panic, + "ddog_sidecar_send_ffe_flag_evaluation_batch", + )) + }) +} + +fn ddog_sidecar_send_ffe_flag_evaluation_batch_impl( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + context: &FfeTelemetryContext<'_>, + flag_evaluations: Slice>, +) -> MaybeError { + let flag_evaluations = try_c!(flag_evaluations + .try_as_slice() + .map_err(|e| format!("Invalid flag evaluation slice: {e}"))); + + if flag_evaluations.is_empty() { + return MaybeError::None; + } + + let context = try_c!(ffe_context_from_ffi(context)); + let flag_evaluations = try_c!(flag_evaluations + .iter() + .map(|event| ffe_flag_evaluation_from_ffi(event, &context.service)) + .collect::, _>>()); + + if flag_evaluations.is_empty() { + return MaybeError::None; + } + + try_c!(blocking::enqueue_actions_reliable( + transport, + instance_id, + queue_id, + vec![SidecarAction::FfeFlagEvaluationBatch( + SidecarFfeFlagEvaluationBatch { + context, + flag_evaluations, + } + )], + )); + MaybeError::None +} + /// Send structured FFE evaluation metric events to the sidecar. The sidecar /// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This /// function is caller-driven so SDKs with existing host-language hooks can @@ -1311,6 +1404,37 @@ fn ffe_exposure_from_ffi(exposure: &FfeExposure<'_>) -> Result, + service: &str, +) -> Result { + let evaluation = optional_json_object_string(event.evaluation_context_json)?; + let context = evaluation.map(|evaluation| FlagEvalEventContext { + evaluation: Some(evaluation), + dd: Some(ContextDD { + service: service.to_owned(), + }), + }); + + Ok(SidecarFfeFlagEvaluationEvent { + timestamp: event.timestamp_ms, + flag: FlagKey { + key: char_slice_to_string(event.flag_key)?, + }, + first_evaluation: event.first_evaluation_ms, + last_evaluation: event.last_evaluation_ms, + evaluation_count: event.evaluation_count, + variant: optional_string(event.variant)?.map(|key| VariantKey { key }), + allocation: optional_string(event.allocation_key)?.map(|key| AllocationKey { key }), + targeting_rule: optional_string(event.targeting_rule_key)? + .map(|key| TargetingRuleKey { key }), + targeting_key: optional_string(event.targeting_key)?, + context, + error: optional_string(event.error_message)?.map(|message| EvalError { message }), + runtime_default_used: event.runtime_default_used, + }) +} + fn ffe_metric_from_ffi( metric: &FfeEvaluationMetric<'_>, ) -> Result { @@ -1331,6 +1455,21 @@ fn optional_string(slice: CharSlice) -> Result, String> { } } +fn optional_json_object_string(slice: CharSlice) -> Result, String> { + let Some(raw) = optional_string(slice)? else { + return Ok(None); + }; + let value = match serde_json::from_str::(&raw) { + Ok(value) => value, + Err(_) => return Ok(None), + }; + if value.is_object() { + Ok(Some(value.to_string())) + } else { + Ok(None) + } +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 443b292b32..c8da740354 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -29,7 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } libdd-remote-config = { path = "../libdd-remote-config" } datadog-live-debugger = { path = "../datadog-live-debugger" } -datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] } +datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics", "flagevaluation-evp"] } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } libdd-tinybytes = { path = "../libdd-tinybytes" } diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 80a9257384..3b88c5ea21 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -6,7 +6,7 @@ use super::{ SessionConfig, SidecarAction, SidecarFlushOptions, }; use crate::service::sender::SidecarSender; -use crate::service::sidecar_interface::SidecarInterfaceChannel; +use crate::service::sidecar_interface::{SidecarInterfaceChannel, SidecarInterfaceRequest}; use datadog_ipc::platform::{FileBackedHandle, ShmHandle}; use datadog_ipc::SeqpacketConn; use datadog_live_debugger::debugger_defs::DebuggerPayload; @@ -222,6 +222,32 @@ pub fn enqueue_actions( Ok(()) } +/// Reliably enqueues a list of actions to be performed. +/// +/// Unlike [`enqueue_actions`], this uses the checked, blocking channel path with +/// no load-shedding and no silent drop: the `io::Result` from the send +/// propagates to the caller. On a broken pipe / connection reset / +/// not-connected error the transport reconnects and retries the exact same +/// pre-encoded request bytes once on the fresh connection. +/// +/// Intended for one-shot, non-replayed payloads (for example FFE +/// flagevaluation batches) that must not be silently lost under transient +/// backpressure or a broken pipe. +pub fn enqueue_actions_reliable( + transport: &mut SidecarTransport, + instance_id: &InstanceId, + queue_id: &QueueId, + actions: Vec, +) -> io::Result<()> { + let req = SidecarInterfaceRequest::EnqueueActions { + instance_id: instance_id.clone(), + queue_id: *queue_id, + actions, + }; + let data = datadog_ipc::codec::encode(&req); + transport.with_retry(|sender| sender.drain_and_send_raw_blocking(&data)) +} + /// Removes the application entry for the given queue ID from the instance. pub fn clear_queue_id( transport: &mut SidecarTransport, diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs new file mode 100644 index 0000000000..c3324dd3a7 --- /dev/null +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -0,0 +1,1048 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Serializes and forwards FFE (Feature Flag Evaluation) flag evaluation +//! batches to the Datadog Agent's EVP proxy. +//! +//! Protocol: `POST /evp_proxy/v2/api/v2/flagevaluations` with the header +//! `X-Datadog-EVP-Subdomain: event-platform-intake`. Fire-and-forget: non-2xx +//! responses are logged at `warn`, network errors at `debug`, and dropped +//! (matches dd-trace-go behaviour). No agent capability gate. + +use crate::service::{FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, FfeTelemetryContext}; +use datadog_ffe::telemetry::flagevaluation::{DEGRADED_CAP, GLOBAL_CAP, PER_FLAG_CAP}; +use http::uri::PathAndQuery; +use http::Method; +use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_capabilities_impl::NativeCapabilities; +use libdd_common::Endpoint; +use libdd_common::MutexExt; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tracing::{debug, warn}; + +/// EVP proxy path for FFE flag evaluation intake. +pub(crate) const EVP_FLAGEVALUATIONS_PATH: &str = "/evp_proxy/v2/api/v2/flagevaluations"; + +/// EVP subdomain that routes requests to event-platform intake. +pub(crate) const EVP_SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain"; +pub(crate) const EVP_SUBDOMAIN_VALUE: &str = "event-platform-intake"; + +const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); +const COALESCE_DELAY: Duration = Duration::from_millis(250); +const MAX_PENDING_BUCKETS: usize = GLOBAL_CAP + DEGRADED_CAP; +const MAX_EVENTS_PER_POST: usize = 512; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct DestinationKey { + url: String, + timeout_ms: u64, + test_token: Option, + use_system_resolver: bool, + context: FfeTelemetryContext, +} + +impl DestinationKey { + fn new(endpoint: &Endpoint, context: &FfeTelemetryContext) -> Self { + Self { + url: endpoint.url.to_string(), + timeout_ms: endpoint.timeout_ms, + test_token: endpoint.test_token.as_ref().map(|s| s.to_string()), + use_system_resolver: endpoint.use_system_resolver, + context: context.clone(), + } + } +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct EventKey { + flag_key: String, + variant_key: Option, + allocation_key: Option, + targeting_rule_key: Option, + targeting_key: Option, + context_evaluation: Option, + context_dd_service: Option, + error_message: Option, + runtime_default_used: bool, +} + +impl EventKey { + fn new(event: &FfeFlagEvaluationEvent) -> Self { + Self { + flag_key: event.flag.key.clone(), + variant_key: event.variant.as_ref().map(|v| v.key.clone()), + allocation_key: event.allocation.as_ref().map(|a| a.key.clone()), + targeting_rule_key: event.targeting_rule.as_ref().map(|r| r.key.clone()), + targeting_key: event.targeting_key.clone(), + context_evaluation: event + .context + .as_ref() + .and_then(|context| context.evaluation.clone()), + context_dd_service: event + .context + .as_ref() + .and_then(|context| context.dd.as_ref().map(|dd| dd.service.clone())), + error_message: event.error.as_ref().map(|e| e.message.clone()), + runtime_default_used: event.runtime_default_used, + } + } + + fn degraded(event: &FfeFlagEvaluationEvent) -> Self { + Self { + flag_key: event.flag.key.clone(), + variant_key: event.variant.as_ref().map(|v| v.key.clone()), + allocation_key: event.allocation.as_ref().map(|a| a.key.clone()), + targeting_rule_key: event.targeting_rule.as_ref().map(|r| r.key.clone()), + targeting_key: None, + context_evaluation: None, + context_dd_service: None, + error_message: event.error.as_ref().map(|e| e.message.clone()), + runtime_default_used: event.runtime_default_used, + } + } +} + +struct PendingDestination { + endpoint: Endpoint, + context: FfeTelemetryContext, + events: HashMap, +} + +#[derive(Default)] +struct CoalescerState { + destinations: HashMap, + flush_running: bool, + pending_bucket_count: usize, + full_bucket_count: usize, + full_bucket_count_by_flag: HashMap, + degraded_bucket_count: usize, + dropped_overflow: u64, +} + +#[derive(Clone, Default)] +pub(crate) struct FlagEvaluationCoalescer { + state: Arc>, +} + +impl FlagEvaluationCoalescer { + pub(crate) fn enqueue( + &self, + client: NativeCapabilities, + endpoint: Endpoint, + batch: FfeFlagEvaluationBatch, + ) { + if batch.flag_evaluations.is_empty() { + return; + } + + let mut state = self.state.lock_or_panic(); + let destination_key = DestinationKey::new(&endpoint, &batch.context); + state + .destinations + .entry(destination_key.clone()) + .or_insert_with(|| PendingDestination { + endpoint, + context: batch.context, + events: HashMap::new(), + }); + + for mut event in batch.flag_evaluations { + let key = EventKey::new(&event); + if merge_pending_event(&mut state, &destination_key, &key, &event) { + continue; + } + + let flag_key = event.flag.key.clone(); + let full_bucket_count_for_flag = state + .full_bucket_count_by_flag + .get(&flag_key) + .copied() + .unwrap_or(0); + + if state.full_bucket_count < GLOBAL_CAP && full_bucket_count_for_flag < PER_FLAG_CAP { + insert_pending_event(&mut state, &destination_key, key, event); + state.full_bucket_count += 1; + *state.full_bucket_count_by_flag.entry(flag_key).or_default() += 1; + continue; + } + + event.targeting_key = None; + event.context = None; + let degraded_key = EventKey::degraded(&event); + if merge_pending_event(&mut state, &destination_key, °raded_key, &event) { + continue; + } + + if state.degraded_bucket_count >= DEGRADED_CAP + || state.pending_bucket_count >= MAX_PENDING_BUCKETS + { + state.dropped_overflow = state.dropped_overflow.saturating_add(1); + continue; + } + + insert_pending_event(&mut state, &destination_key, degraded_key, event); + state.degraded_bucket_count += 1; + } + + if !state.flush_running { + state.flush_running = true; + let coalescer = self.clone(); + tokio::spawn(async move { + coalescer.flush_loop(client).await; + }); + } + } + + pub(crate) async fn flush_now(&self, client: NativeCapabilities) { + let batches = self.take_batches(); + futures::future::join_all(batches.into_iter().map(|(endpoint, batch)| { + let client = client.clone(); + async move { send_batch(&client, &endpoint, batch).await } + })) + .await; + } + + async fn flush_loop(self, client: NativeCapabilities) { + loop { + tokio::time::sleep(COALESCE_DELAY).await; + let batches = self.take_batches(); + futures::future::join_all(batches.into_iter().map(|(endpoint, batch)| { + let client = client.clone(); + async move { send_batch(&client, &endpoint, batch).await } + })) + .await; + + let mut state = self.state.lock_or_panic(); + if state.destinations.is_empty() { + state.flush_running = false; + break; + } + } + } + + fn take_batches(&self) -> Vec<(Endpoint, FfeFlagEvaluationBatch)> { + let mut state = self.state.lock_or_panic(); + if state.dropped_overflow > 0 { + warn!( + "ffe_flagevaluation_flusher: dropped {} pending bucket(s) after sidecar coalescer cap", + state.dropped_overflow + ); + state.dropped_overflow = 0; + } + + let destinations = std::mem::take(&mut state.destinations); + state.pending_bucket_count = 0; + state.full_bucket_count = 0; + state.full_bucket_count_by_flag.clear(); + state.degraded_bucket_count = 0; + destinations + .into_values() + .filter_map(|pending| { + if pending.events.is_empty() { + return None; + } + Some(( + pending.endpoint, + FfeFlagEvaluationBatch { + context: pending.context, + flag_evaluations: pending.events.into_values().collect(), + }, + )) + }) + .collect() + } +} + +fn merge_pending_event( + state: &mut CoalescerState, + destination_key: &DestinationKey, + key: &EventKey, + event: &FfeFlagEvaluationEvent, +) -> bool { + let pending = state + .destinations + .get_mut(destination_key) + .expect("destination was inserted before event merge"); + if let Some(existing) = pending.events.get_mut(key) { + merge_event(existing, event); + true + } else { + false + } +} + +fn insert_pending_event( + state: &mut CoalescerState, + destination_key: &DestinationKey, + key: EventKey, + event: FfeFlagEvaluationEvent, +) { + state + .destinations + .get_mut(destination_key) + .expect("destination was inserted before event insert") + .events + .insert(key, event); + state.pending_bucket_count += 1; +} + +fn merge_event(existing: &mut FfeFlagEvaluationEvent, incoming: &FfeFlagEvaluationEvent) { + existing.timestamp = existing.timestamp.max(incoming.timestamp); + existing.first_evaluation = existing.first_evaluation.min(incoming.first_evaluation); + existing.last_evaluation = existing.last_evaluation.max(incoming.last_evaluation); + existing.evaluation_count = existing + .evaluation_count + .saturating_add(incoming.evaluation_count); +} + +/// Build the FFE flagevaluation endpoint from a session's agent base endpoint. +/// Overrides only the path (`/evp_proxy/v2/api/v2/flagevaluations`), preserving +/// scheme, authority, timeout, and test_token. +/// Returns `None` for agentless mode because EVP proxy routing is agent-only. +pub(crate) fn flagevaluation_endpoint(base: &Endpoint) -> Option { + if base.api_key.is_some() { + return None; + } + + let mut parts = base.url.clone().into_parts(); + parts.path_and_query = Some(PathAndQuery::from_static(EVP_FLAGEVALUATIONS_PATH)); + let url = http::Uri::from_parts(parts).ok()?; + Some(Endpoint { + url, + ..base.clone() + }) +} + +/// POST a structured FFE flag evaluation batch to the agent EVP proxy. +/// Fire-and-forget: non-2xx responses are logged at `warn`, network errors at +/// `debug`, and dropped (matches dd-trace-go behaviour). +pub(crate) async fn send_batch( + client: &C, + endpoint: &Endpoint, + batch: FfeFlagEvaluationBatch, +) { + for chunk in split_batch_for_post(batch) { + let payload = match build_payload(&chunk) { + Ok(p) => p, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to encode batch payload: {e:?}"); + return; + } + }; + send_payload(client, endpoint, payload).await; + } +} + +fn split_batch_for_post(batch: FfeFlagEvaluationBatch) -> Vec { + let FfeFlagEvaluationBatch { + context, + flag_evaluations, + } = batch; + + flag_evaluations + .chunks(MAX_EVENTS_PER_POST) + .map(|chunk| FfeFlagEvaluationBatch { + context: context.clone(), + flag_evaluations: chunk.to_vec(), + }) + .collect() +} + +/// Build the EVP POST body from a batch. +/// +/// The flagevaluation types are serialized over the sidecar's **bincode** IPC +/// wire, which is non-self-describing: a field omitted by `skip_serializing_if` +/// would misalign the derived `Deserialize` and cause the sidecar to drop the +/// whole batch. The types therefore carry **no** `skip_serializing_if` and emit +/// every field (optional ones as `null`/`false`/`""`). The flageval-worker EVP +/// schema, however, rejects those null/empty placeholders (especially for +/// degraded-tier events), so this helper strips them here, on the outbound POST +/// only — reproducing the old skip-serialization semantics without breaking the +/// bincode wire. +/// +/// Two transforms happen, in order, on each `flagEvaluations` element: +/// 1. `context.evaluation` is carried as a JSON-object **string** (bincode cannot encode +/// `serde_json::Value`); it is re-expanded back into a JSON **object** in place. An +/// unparseable string drops the field gracefully (never panics), matching the best-effort +/// telemetry contract. +/// 2. The whole value is recursively cleaned (`strip_placeholders`) so the POST contains no +/// optional-field placeholders. `context.evaluation` user values are preserved as-is; boolean +/// `false`, empty strings, empty objects, and empty arrays are valid context values. Numeric +/// zeros (timestamps/counts) are preserved — they are real data. +fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result { + let mut value = serde_json::to_value(batch)?; + + if let Some(events) = value + .get_mut("flagEvaluations") + .and_then(serde_json::Value::as_array_mut) + { + for event in events { + let Some(context) = event.get_mut("context") else { + continue; + }; + let Some(evaluation) = context.get_mut("evaluation") else { + continue; + }; + if let Some(s) = evaluation.as_str() { + match serde_json::from_str::(s) { + // Re-expand the JSON-object string into an object in place. + Ok(parsed) => *evaluation = parsed, + // Unparseable string → drop the field gracefully (never panic). + Err(_) => { + if let Some(obj) = context.as_object_mut() { + obj.remove("evaluation"); + } + } + } + } + } + } + + // Strip null/empty placeholders so the EVP POST matches the flageval-worker + // schema (which rejects null placeholders) — see the function doc comment. + strip_placeholders(&mut value); + + serde_json::to_string(&value) +} + +/// Recursively remove placeholder entries from a JSON value so the EVP POST +/// carries no null/empty fields, reproducing the old `skip_serializing_if` +/// semantics on the outbound wire only. +/// +/// An object entry is dropped when its value, after the children have +/// themselves been cleaned (bottom-up), is one of: +/// - `null` (was `Option::is_none`) +/// - `false` for `runtime_default_used` +/// - `""` for `service` +/// - `{}` (an object that became empty after cleaning, e.g. a `context.dd` whose +/// only field `service` was stripped) +/// - `[]` (an array that became empty after cleaning) +/// +/// `context.evaluation` is not cleaned recursively because its children are +/// user context values, not optional-field placeholders. +/// +/// Numeric values (including `0`) are NEVER removed — timestamps and counts are +/// real data. Non-zero bools (`true`) and non-empty strings/collections are +/// kept. +fn strip_placeholders(value: &mut serde_json::Value) { + match value { + serde_json::Value::Object(map) => { + // Clean children first (bottom-up), then drop entries that are now + // placeholders, so a container emptied by cleaning is itself removed. + for (key, child) in map.iter_mut() { + // `context.evaluation` contains user context values. Boolean + // false, empty strings, empty objects, and empty arrays are + // valid there and must not be stripped as optional-field + // placeholders. + if key != "evaluation" { + strip_placeholders(child); + } + } + map.retain(|key, v| !is_placeholder(key, v)); + } + serde_json::Value::Array(items) => { + for item in items.iter_mut() { + strip_placeholders(item); + } + items.retain(|v| !is_array_placeholder(v)); + } + _ => {} + } +} + +/// Whether a (already-cleaned) JSON value is an empty/null placeholder that +/// should be dropped from the POST. Numeric zeros are NOT placeholders. +fn is_placeholder(key: &str, value: &serde_json::Value) -> bool { + match value { + serde_json::Value::Null => true, + serde_json::Value::Bool(b) => key == "runtime_default_used" && !b, + serde_json::Value::String(s) => key == "service" && s.is_empty(), + serde_json::Value::Object(map) => map.is_empty(), + serde_json::Value::Array(items) => items.is_empty(), + // Numbers (incl. 0) are real data — never placeholders. + serde_json::Value::Number(_) => false, + } +} + +fn is_array_placeholder(value: &serde_json::Value) -> bool { + match value { + serde_json::Value::Null => true, + serde_json::Value::Object(map) => map.is_empty(), + serde_json::Value::Array(items) => items.is_empty(), + _ => false, + } +} + +async fn send_payload( + client: &C, + endpoint: &Endpoint, + payload: String, +) { + let builder = match endpoint.to_request_builder(USER_AGENT) { + Ok(b) => b, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to build request: {e:?}"); + return; + } + }; + + let req = match builder + .method(Method::POST) + .header("Content-Type", "application/json") + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .body(Bytes::from(payload)) + { + Ok(r) => r, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to construct request body: {e:?}"); + return; + } + }; + + let timeout = Duration::from_millis(endpoint.timeout_ms); + let response = tokio::select! { + biased; + result = client.request(req) => result, + _ = client.sleep(timeout) => { + debug!("ffe_flagevaluation_flusher: request timed out after {timeout:?}"); + return; + } + }; + + match response { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + let body_preview = truncate(resp.body().as_ref(), 256); + warn!("ffe_flagevaluation_flusher: non-2xx response {status}: {body_preview}"); + } else { + debug!("ffe_flagevaluation_flusher: sent flag evaluation batch, status={status}"); + } + } + Err(e) => { + debug!("ffe_flagevaluation_flusher: request failed: {e:?}"); + } + } +} + +fn truncate(bytes: &[u8], cap: usize) -> String { + let take = bytes.len().min(cap); + String::from_utf8_lossy(&bytes[..take]).into_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; + use datadog_ffe::telemetry::flagevaluation::{ + AllocationKey, ContextDD, EvalError, FfeFlagEvaluationEvent, FlagEvalEventContext, FlagKey, + TargetingRuleKey, VariantKey, PER_FLAG_CAP, + }; + use httpmock::MockServer; + use libdd_capabilities::{HttpError, MaybeSend}; + use libdd_capabilities_impl::NativeCapabilities; + use std::collections::BTreeMap; + use std::future; + + fn endpoint_for(server: &MockServer) -> Endpoint { + Endpoint { + url: server.url("/").parse().unwrap(), + ..Endpoint::default() + } + } + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn eval_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 5, + variant: None, + allocation: None, + targeting_rule: None, + targeting_key: None, + // `evaluation` is carried as a JSON-object STRING on the wire (bincode + // can't carry serde_json::Value); the flusher re-expands it to an object. + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("country".to_owned(), serde_json::json!("US")); + m + }) + .unwrap(), + ), + dd: None, + }), + error: None, + runtime_default_used: false, + } + } + + fn batch() -> FfeFlagEvaluationBatch { + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![eval_event()], + } + } + + fn full_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 42, + variant: Some(VariantKey { + key: "on".to_owned(), + }), + allocation: Some(AllocationKey { + key: "alloc-a".to_owned(), + }), + targeting_rule: Some(TargetingRuleKey { + key: "rule-1".to_owned(), + }), + targeting_key: Some("user-123".to_owned()), + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), serde_json::json!("premium")); + m + }) + .unwrap(), + ), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }), + error: Some(EvalError { + message: "boom".to_owned(), + }), + runtime_default_used: true, + } + } + + fn degraded_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "flag-b".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 7, + variant: None, + allocation: None, + targeting_rule: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + } + } + + #[test] + fn build_payload_strips_degraded_tier_placeholders() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![degraded_event()], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + let ev = &v["flagEvaluations"][0]; + + assert_eq!(ev["flag"]["key"], "flag-b"); + assert_eq!(ev["evaluation_count"], 7); + assert!(ev["first_evaluation"].is_number()); + assert!(ev["last_evaluation"].is_number()); + assert!(ev["timestamp"].is_number()); + + assert!(ev.get("variant").is_none(), "variant must be stripped"); + assert!( + ev.get("allocation").is_none(), + "allocation must be stripped" + ); + assert!( + ev.get("targeting_rule").is_none(), + "targeting_rule must be stripped" + ); + assert!( + ev.get("targeting_key").is_none(), + "targeting_key must be stripped" + ); + assert!(ev.get("context").is_none(), "context must be stripped"); + assert!(ev.get("error").is_none(), "error must be stripped"); + assert!( + ev.get("runtime_default_used").is_none(), + "runtime_default_used=false must be stripped" + ); + } + + #[test] + fn build_payload_keeps_full_tier_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + let ev = &v["flagEvaluations"][0]; + + assert_eq!(ev["variant"]["key"], "on", "variant must be kept"); + assert_eq!( + ev["allocation"]["key"], "alloc-a", + "allocation must be kept" + ); + assert_eq!( + ev["targeting_rule"]["key"], "rule-1", + "targeting_rule must be kept" + ); + assert_eq!( + ev["targeting_key"], "user-123", + "targeting_key must be kept" + ); + assert_eq!(ev["error"]["message"], "boom", "error must be kept"); + assert_eq!( + ev["runtime_default_used"], true, + "runtime_default_used=true must be kept" + ); + assert!( + ev.get("reason").is_none(), + "EVP payload must not emit top-level OpenFeature reason" + ); + + let ctx = &ev["context"]; + assert!( + ctx["evaluation"].is_object(), + "context.evaluation must be an object: {}", + ctx["evaluation"] + ); + assert_eq!(ctx["evaluation"]["plan"], "premium"); + assert_eq!( + ctx["dd"]["service"], "frontend", + "context.dd.service must be kept" + ); + } + + #[test] + fn build_payload_collapses_empty_nested_context() { + let mut ev = degraded_event(); + ev.context = Some(FlagEvalEventContext { + evaluation: None, + dd: Some(ContextDD { + service: String::new(), + }), + }); + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![ev], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + + assert!( + v["flagEvaluations"][0].get("context").is_none(), + "a context that becomes empty after cleaning must be removed entirely" + ); + } + + #[test] + fn build_payload_expands_evaluation_string_into_object() { + let payload = build_payload(&batch()).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + + let evaluation = &v["flagEvaluations"][0]["context"]["evaluation"]; + assert!( + evaluation.is_object(), + "context.evaluation must be a JSON object in the POST body, not a string: {evaluation}" + ); + assert_eq!( + evaluation["country"], "US", + "the expanded object must preserve the original key/value" + ); + assert!( + !evaluation.is_string(), + "context.evaluation must not remain a quoted string" + ); + } + + #[test] + fn build_payload_drops_unparseable_evaluation_gracefully() { + let mut batch = batch(); + batch.flag_evaluations[0].context = Some(FlagEvalEventContext { + evaluation: Some("this is not json".to_owned()), + dd: None, + }); + + let payload = build_payload(&batch).expect("build_payload must not fail on bad input"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + + assert!( + v["flagEvaluations"][0]["context"] + .get("evaluation") + .is_none(), + "unparseable evaluation must be dropped from the body" + ); + } + + #[test] + fn build_payload_preserves_false_and_empty_context_values() { + let mut batch = batch(); + batch.flag_evaluations[0].context = Some(FlagEvalEventContext { + evaluation: Some( + serde_json::json!({ + "enabled": false, + "empty": "", + "empty_object": {}, + "empty_array": [] + }) + .to_string(), + ), + dd: None, + }); + + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + let evaluation = &v["flagEvaluations"][0]["context"]["evaluation"]; + + assert_eq!(evaluation["enabled"], false); + assert_eq!(evaluation["empty"], ""); + assert!(evaluation["empty_object"].is_object()); + assert!(evaluation["empty_array"].is_array()); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn posts_to_evp_proxy() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + + send_batch(&client, &ep, batch()).await; + + mock.assert_async().await; + assert_eq!(mock.calls_async().await, 1); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn splits_large_batches_before_posting() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + let mut batch = batch(); + let event = batch.flag_evaluations[0].clone(); + batch.flag_evaluations = vec![event; MAX_EVENTS_PER_POST * 2 + 1]; + + send_batch(&client, &ep, batch).await; + + mock.assert_calls_async(3).await; + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn coalesces_identical_batches_before_posting() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH) + .body_includes("\"evaluation_count\":10"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + let coalescer = FlagEvaluationCoalescer::default(); + + coalescer.enqueue(client.clone(), ep.clone(), batch()); + coalescer.enqueue(client.clone(), ep, batch()); + + for _ in 0..100 { + if mock.calls_async().await == 1 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + + mock.assert_calls_async(1).await; + } + + #[test] + fn coalescer_degrades_after_per_flag_cap() { + let endpoint = Endpoint { + url: "http://agent:8126".parse().unwrap(), + ..Endpoint::default() + }; + let ep = flagevaluation_endpoint(&endpoint).unwrap(); + let coalescer = FlagEvaluationCoalescer::default(); + coalescer.state.lock().unwrap().flush_running = true; + + let mut events = Vec::with_capacity(PER_FLAG_CAP + 50); + for index in 0..(PER_FLAG_CAP + 50) { + let mut event = full_event(); + event.evaluation_count = 1; + event.targeting_key = Some(format!("user-{index}")); + events.push(event); + } + + coalescer.enqueue( + NativeCapabilities::new_client(), + ep, + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: events, + }, + ); + + let batches = coalescer.take_batches(); + assert_eq!(batches.len(), 1); + let events = &batches[0].1.flag_evaluations; + let full_events = events + .iter() + .filter(|event| event.targeting_key.is_some() || event.context.is_some()) + .count(); + let degraded = events + .iter() + .find(|event| event.targeting_key.is_none() && event.context.is_none()) + .expect("overflow must be folded into a degraded bucket"); + + assert_eq!(full_events, PER_FLAG_CAP); + assert_eq!(degraded.evaluation_count, 50); + assert_eq!( + degraded.variant.as_ref().map(|v| v.key.as_str()), + Some("on") + ); + assert_eq!( + degraded.allocation.as_ref().map(|a| a.key.as_str()), + Some("alloc-a") + ); + assert_eq!( + degraded + .targeting_rule + .as_ref() + .map(|rule| rule.key.as_str()), + Some("rule-1") + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn non_2xx_does_not_panic() { + let server = MockServer::start_async().await; + let _mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH); + then.status(500).body("intake overloaded"); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + send_batch(&client, &ep, batch()).await; + // Test passes if no panic occurs. + } + + #[tokio::test] + async fn timeout_returns_without_waiting_for_http_response() { + let ep = Endpoint { + url: "http://localhost:8126".parse().unwrap(), + timeout_ms: 1, + ..Endpoint::default() + }; + + send_batch(&HangingCapabilities, &ep, batch()).await; + // Test passes if function returns before the pending HTTP future resolves. + } + + #[test] + fn endpoint_preserves_authority_overrides_path() { + let base = Endpoint { + url: "http://agent.internal:8126/v0.4/traces".parse().unwrap(), + ..Endpoint::default() + }; + let ep = flagevaluation_endpoint(&base).unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.authority().unwrap().as_str(), "agent.internal:8126"); + assert_eq!(ep.url.path(), EVP_FLAGEVALUATIONS_PATH); + } + + #[test] + fn endpoint_rejects_agentless() { + let base = Endpoint { + url: "https://trace.agent.datadoghq.com/v0.4/traces" + .parse() + .unwrap(), + api_key: Some("api-key".into()), + ..Endpoint::default() + }; + assert!(flagevaluation_endpoint(&base).is_none()); + } + + #[derive(Clone, Debug)] + struct HangingCapabilities; + + impl HttpClientCapability for HangingCapabilities { + fn new_client() -> Self { + Self + } + + fn request( + &self, + _req: http::Request, + ) -> impl future::Future, HttpError>> + MaybeSend + { + future::pending() + } + } + + impl SleepCapability for HangingCapabilities { + fn new() -> Self { + Self + } + + async fn sleep(&self, _duration: Duration) {} + } +} diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index a7019781ff..bd95da24bf 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -5,6 +5,10 @@ use crate::config; pub use datadog_ffe::telemetry::evaluation_metrics::FfeEvaluationMetric; pub use datadog_ffe::telemetry::exposures::{FfeExposure, FfeExposureBatch}; +pub use datadog_ffe::telemetry::flagevaluation::{ + AllocationKey, ContextDD, EvalError, FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, + FlagEvalEventContext, FlagKey, TargetingRuleKey, VariantKey, +}; pub use datadog_ffe::telemetry::FfeTelemetryContext; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -32,6 +36,7 @@ pub mod blocking; mod debugger_diagnostics_bookkeeper; pub mod exception_hash_rate_limiter; pub(crate) mod ffe_exposures_flusher; +pub(crate) mod ffe_flagevaluation_flusher; pub(crate) mod ffe_metrics_flusher; mod instance_id; mod queue_id; @@ -100,4 +105,13 @@ pub enum SidecarAction { context: FfeTelemetryContext, metrics: Vec, }, + /// Structured FFE flag evaluation batch for the EVP flagevaluation track. + /// The sidecar serializes and POSTs the batch to + /// `/evp_proxy/v2/api/v2/flagevaluations` (fire-and-forget). PHP (EMIT-07) + /// drives the two-tier aggregation upstream and dispatches via this action. + /// + /// Keep this appended after pre-existing variants: this enum crosses the + /// bincode sidecar IPC boundary, so inserting a variant before existing + /// variants changes their wire ordinals. + FfeFlagEvaluationBatch(FfeFlagEvaluationBatch), } diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 5b2d27b801..90fa0e29d4 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -37,6 +37,7 @@ use crate::service::debugger_diagnostics_bookkeeper::{ }; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::ffe_exposures_flusher; +use crate::service::ffe_flagevaluation_flusher; use crate::service::ffe_metrics_flusher; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::stats_flusher::{ @@ -117,6 +118,8 @@ pub struct SidecarServer { pub(crate) ffe_http_client: NativeCapabilities, /// Sidecar-owned exposure cache, shared across sessions/connections. pub(crate) ffe_exposure_deduplicator: ffe_exposures_flusher::ExposureDeduplicator, + /// Sidecar-owned EVP flagevaluation coalescer, shared across PHP request lifetimes. + pub(crate) ffe_flagevaluation_coalescer: ffe_flagevaluation_flusher::FlagEvaluationCoalescer, } /// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect. @@ -446,6 +449,27 @@ impl SidecarInterface for ConnectionSidecarHandler { } false } + SidecarAction::FfeFlagEvaluationBatch(batch) => { + if let Some(base) = trace_config.endpoint.as_ref() { + if let Some(ep) = ffe_flagevaluation_flusher::flagevaluation_endpoint(base) + { + self.server.ffe_flagevaluation_coalescer.enqueue( + ffe_http_client.clone(), + ep, + batch.clone(), + ); + } else { + debug!( + "ffe_flagevaluation_flusher: could not derive endpoint, dropping batch" + ); + } + } else { + debug!( + "ffe_flagevaluation_flusher: no session endpoint, dropping batch" + ); + } + false + } SidecarAction::FfeEvaluationMetrics { context, metrics } => { if let Some(ep) = session.get_otlp_metrics_endpoint().clone() { let client = ffe_http_client.clone(); @@ -1060,6 +1084,11 @@ impl SidecarInterface for ConnectionSidecarHandler { } async fn flush(&self, _peer: PeerCredentials, options: SidecarFlushOptions) { + self.server + .ffe_flagevaluation_coalescer + .flush_now(self.server.ffe_http_client.clone()) + .await; + if options.traces_and_stats { let flusher = self.server.trace_flusher.clone(); if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await { diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 9954f6eb41..845f478b31 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -455,6 +455,7 @@ impl TelemetryCachedClient { } SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately SidecarAction::FfeExposureBatch(_) => {} // handled in sidecar_server + SidecarAction::FfeFlagEvaluationBatch(_) => {} // handled in sidecar_server SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server } }