Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions datadog-ffe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true }
prost = { version = "0.14.1", optional = true }
pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] }

[dev-dependencies]
# Matches the sidecar's bincode version. Used by the flagevaluation bincode
# round-trip test, which guards against `skip_serializing_if` reintroducing the
# worker→sidecar IPC field-misalignment bug (bincode is non-self-describing).
bincode = { version = "1.3.3" }

[features]
default = ["remote-config"]
exposure-events = ["dep:lru"]
evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"]
flagevaluation-evp = []
pyo3 = ["dep:pyo3"]
remote-config = ["dep:libdd-remote-config"]
6 changes: 5 additions & 1 deletion datadog-ffe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ mod flag_type;
#[cfg(feature = "remote-config")]
mod remote_config;
pub mod rules_based;
#[cfg(any(feature = "exposure-events", feature = "evaluation-metrics"))]
#[cfg(any(
feature = "exposure-events",
feature = "evaluation-metrics",
feature = "flagevaluation-evp"
))]
pub mod telemetry;

pub use flag_type::{ExpectedFlagType, FlagType};
429 changes: 429 additions & 0 deletions datadog-ffe/src/telemetry/flagevaluation.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions datadog-ffe/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
pub mod evaluation_metrics;
#[cfg(feature = "exposure-events")]
pub mod exposures;
#[cfg(feature = "flagevaluation-evp")]
pub mod flagevaluation;

use serde::{Deserialize, Serialize};

Expand Down
147 changes: 143 additions & 4 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ use datadog_sidecar::service::agent_info::AgentInfoReader;
use datadog_sidecar::service::telemetry::InternalTelemetryAction;
use datadog_sidecar::service::{
blocking::{self, SidecarTransport},
DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric,
FfeExposure as SidecarFfeExposure, FfeExposureBatch as SidecarFfeExposureBatch,
FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata,
SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions,
AllocationKey, ContextDD, DynamicInstrumentationConfigState, EvalError,
FfeEvaluationMetric as SidecarFfeEvaluationMetric, FfeExposure as SidecarFfeExposure,
FfeExposureBatch as SidecarFfeExposureBatch,
FfeFlagEvaluationBatch as SidecarFfeFlagEvaluationBatch,
FfeFlagEvaluationEvent as SidecarFfeFlagEvaluationEvent,
FfeTelemetryContext as SidecarFfeTelemetryContext, FlagEvalEventContext, FlagKey, InstanceId,
QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
SidecarFlushOptions, TargetingRuleKey, VariantKey,
};
use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions};
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
Expand Down Expand Up @@ -1176,6 +1180,23 @@ pub struct FfeEvaluationMetric<'a> {
pub allocation_key: CharSlice<'a>,
}

#[repr(C)]
pub struct FfeFlagEvaluation<'a> {
pub timestamp_ms: i64,
pub flag_key: CharSlice<'a>,
pub first_evaluation_ms: i64,
pub last_evaluation_ms: i64,
pub evaluation_count: u64,
pub variant: CharSlice<'a>,
pub allocation_key: CharSlice<'a>,
pub targeting_rule_key: CharSlice<'a>,
pub targeting_key: CharSlice<'a>,
/// UTF-8 JSON object. Empty, invalid, or non-object JSON is omitted.
pub evaluation_context_json: CharSlice<'a>,
pub error_message: CharSlice<'a>,
pub runtime_default_used: bool,
}

/// Send structured FFE exposure events to the sidecar. The sidecar owns
/// deduplication, JSON serialization, and Agent EVP delivery. This function is
/// caller-driven; shared libdatadog evaluator calls do not log unless an SDK
Expand Down Expand Up @@ -1247,6 +1268,78 @@ fn ddog_sidecar_send_ffe_exposure_batch_impl(
MaybeError::None
}

/// Send structured FFE flag evaluation events to the sidecar. The sidecar owns
/// JSON serialization and Agent EVP delivery. This function is caller-driven;
/// callers must aggregate and bound event cardinality before passing a batch.
///
/// # Safety
/// `context` and every element in `flag_evaluations` must contain valid UTF-8
/// `CharSlice` values. Empty `flag_evaluations` is a no-op.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_send_ffe_flag_evaluation_batch(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: &QueueId,
context: &FfeTelemetryContext<'_>,
flag_evaluations: Slice<FfeFlagEvaluation<'_>>,
) -> MaybeError {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
ddog_sidecar_send_ffe_flag_evaluation_batch_impl(
transport,
instance_id,
queue_id,
context,
flag_evaluations,
)
}))
.unwrap_or_else(|panic| {
MaybeError::Some(libdd_common_ffi::utils::handle_panic_error(
panic,
"ddog_sidecar_send_ffe_flag_evaluation_batch",
))
})
}

fn ddog_sidecar_send_ffe_flag_evaluation_batch_impl(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: &QueueId,
context: &FfeTelemetryContext<'_>,
flag_evaluations: Slice<FfeFlagEvaluation<'_>>,
) -> MaybeError {
let flag_evaluations = try_c!(flag_evaluations
.try_as_slice()
.map_err(|e| format!("Invalid flag evaluation slice: {e}")));

if flag_evaluations.is_empty() {
return MaybeError::None;
}

let context = try_c!(ffe_context_from_ffi(context));
let flag_evaluations = try_c!(flag_evaluations
.iter()
.map(|event| ffe_flag_evaluation_from_ffi(event, &context.service))
.collect::<Result<Vec<_>, _>>());

if flag_evaluations.is_empty() {
return MaybeError::None;
}

try_c!(blocking::enqueue_actions_reliable(
transport,
instance_id,
queue_id,
vec![SidecarAction::FfeFlagEvaluationBatch(
SidecarFfeFlagEvaluationBatch {
context,
flag_evaluations,
}
)],
));
MaybeError::None
}

/// Send structured FFE evaluation metric events to the sidecar. The sidecar
/// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This
/// function is caller-driven so SDKs with existing host-language hooks can
Expand Down Expand Up @@ -1311,6 +1404,37 @@ fn ffe_exposure_from_ffi(exposure: &FfeExposure<'_>) -> Result<SidecarFfeExposur
})
}

fn ffe_flag_evaluation_from_ffi(
event: &FfeFlagEvaluation<'_>,
service: &str,
) -> Result<SidecarFfeFlagEvaluationEvent, String> {
let evaluation = optional_json_object_string(event.evaluation_context_json)?;
let context = evaluation.map(|evaluation| FlagEvalEventContext {
evaluation: Some(evaluation),
dd: Some(ContextDD {
service: service.to_owned(),
}),
});

Ok(SidecarFfeFlagEvaluationEvent {
timestamp: event.timestamp_ms,
flag: FlagKey {
key: char_slice_to_string(event.flag_key)?,
},
first_evaluation: event.first_evaluation_ms,
last_evaluation: event.last_evaluation_ms,
evaluation_count: event.evaluation_count,
variant: optional_string(event.variant)?.map(|key| VariantKey { key }),
allocation: optional_string(event.allocation_key)?.map(|key| AllocationKey { key }),
targeting_rule: optional_string(event.targeting_rule_key)?
.map(|key| TargetingRuleKey { key }),
targeting_key: optional_string(event.targeting_key)?,
context,
error: optional_string(event.error_message)?.map(|message| EvalError { message }),
runtime_default_used: event.runtime_default_used,
})
}

fn ffe_metric_from_ffi(
metric: &FfeEvaluationMetric<'_>,
) -> Result<SidecarFfeEvaluationMetric, String> {
Expand All @@ -1331,6 +1455,21 @@ fn optional_string(slice: CharSlice) -> Result<Option<String>, String> {
}
}

fn optional_json_object_string(slice: CharSlice) -> Result<Option<String>, String> {
let Some(raw) = optional_string(slice)? else {
return Ok(None);
};
let value = match serde_json::from_str::<serde_json::Value>(&raw) {
Ok(value) => value,
Err(_) => return Ok(None),
};
if value.is_object() {
Ok(Some(value.to_string()))
} else {
Ok(None)
}
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
#[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" }
libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] }
libdd-remote-config = { path = "../libdd-remote-config" }
datadog-live-debugger = { path = "../datadog-live-debugger" }
datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] }
datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics", "flagevaluation-evp"] }
libdd-crashtracker = { path = "../libdd-crashtracker" }
libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" }
libdd-tinybytes = { path = "../libdd-tinybytes" }
Expand Down
28 changes: 27 additions & 1 deletion datadog-sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
SessionConfig, SidecarAction, SidecarFlushOptions,
};
use crate::service::sender::SidecarSender;
use crate::service::sidecar_interface::SidecarInterfaceChannel;
use crate::service::sidecar_interface::{SidecarInterfaceChannel, SidecarInterfaceRequest};
use datadog_ipc::platform::{FileBackedHandle, ShmHandle};
use datadog_ipc::SeqpacketConn;
use datadog_live_debugger::debugger_defs::DebuggerPayload;
Expand Down Expand Up @@ -222,6 +222,32 @@ pub fn enqueue_actions(
Ok(())
}

/// Reliably enqueues a list of actions to be performed.
///
/// Unlike [`enqueue_actions`], this uses the checked, blocking channel path with
/// no load-shedding and no silent drop: the `io::Result` from the send
/// propagates to the caller. On a broken pipe / connection reset /
/// not-connected error the transport reconnects and retries the exact same
/// pre-encoded request bytes once on the fresh connection.
///
/// Intended for one-shot, non-replayed payloads (for example FFE
/// flagevaluation batches) that must not be silently lost under transient
/// backpressure or a broken pipe.
pub fn enqueue_actions_reliable(
transport: &mut SidecarTransport,
instance_id: &InstanceId,
queue_id: &QueueId,
actions: Vec<SidecarAction>,
) -> io::Result<()> {
let req = SidecarInterfaceRequest::EnqueueActions {
instance_id: instance_id.clone(),
queue_id: *queue_id,
actions,
};
let data = datadog_ipc::codec::encode(&req);
transport.with_retry(|sender| sender.drain_and_send_raw_blocking(&data))
}

/// Removes the application entry for the given queue ID from the instance.
pub fn clear_queue_id(
transport: &mut SidecarTransport,
Expand Down
Loading
Loading