Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,61 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_process_tags(
MaybeError::None
}

/// Parses a `key1=value1,key2=value2` OTLP header string into key/value pairs.
/// Empty and malformed (missing `=` or empty key) entries are skipped.
fn parse_otlp_traces_headers(raw: &str) -> std::vec::Vec<(String, String)> {
raw.split(',')
.filter_map(|pair| {
let pair = pair.trim();
if pair.is_empty() {
return None;
}
let (k, v) = pair.split_once('=')?;
let k = k.trim();
if k.is_empty() {
return None;
}
Some((k.to_string(), v.trim().to_string()))
})
.collect()
}

/// Sets the OTLP traces export configuration for an existing session.
///
/// This is additive and non-breaking: when `otlp_traces_endpoint` is non-null,
/// the session's traces are exported via libdatadog's OTLP `TraceExporter`
/// (HTTP/JSON) instead of the agent msgpack `/v0.4/traces` path. Passing a null
/// `otlp_traces_endpoint` clears the configuration and restores the default
/// agent path. Sessions that never call this function are unaffected.
///
/// `headers` is the raw `key=value,...` string (e.g. the value of
/// `OTEL_EXPORTER_OTLP_TRACES_HEADERS`); `timeout_ms` of `0` selects the
/// default OTLP request timeout. The endpoint URL is used as-is — the host
/// language is responsible for resolving the full `…/v1/traces` URL.
///
/// # Safety
/// `otlp_traces_endpoint`, when non-null, must point to a valid `Endpoint`. All
/// `CharSlice` arguments must point to valid, correctly-sized data.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_session_set_otlp_traces_endpoint(
transport: &mut Box<SidecarTransport>,
session_id: ffi::CharSlice,
otlp_traces_endpoint: *const Endpoint,
headers: ffi::CharSlice,
timeout_ms: u64,
) -> MaybeError {
let session_id: String = session_id.to_utf8_lossy().into();
let endpoint: Option<Endpoint> = unsafe { otlp_traces_endpoint.as_ref().cloned() };
let headers = parse_otlp_traces_headers(&headers.to_utf8_lossy());

try_c!(blocking::set_otlp_traces_config(
transport, session_id, endpoint, headers, timeout_ms,
));

MaybeError::None
}

#[repr(C)]
pub struct TracerHeaderTags<'a> {
pub lang: ffi::CharSlice<'a>,
Expand Down Expand Up @@ -1783,4 +1838,33 @@ mod tests {

assert_eq!(endpoint.test_token.as_deref(), Some("metrics-token"));
}

#[test]
fn parse_otlp_traces_headers_basic() {
let parsed = parse_otlp_traces_headers("api-key=abc123,team=apm");
assert_eq!(
parsed,
vec![
("api-key".to_string(), "abc123".to_string()),
("team".to_string(), "apm".to_string()),
]
);
}

#[test]
fn parse_otlp_traces_headers_trims_and_skips_malformed() {
let parsed = parse_otlp_traces_headers(" k1 = v1 , , bad , k2=v2 ");
assert_eq!(
parsed,
vec![
("k1".to_string(), "v1".to_string()),
("k2".to_string(), "v2".to_string()),
]
);
}

#[test]
fn parse_otlp_traces_headers_empty() {
assert!(parse_otlp_traces_headers("").is_empty());
}
}
15 changes: 15 additions & 0 deletions datadog-sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ pub fn set_session_process_tags(
Ok(())
}

/// Sets (or clears) the OTLP traces export configuration for a session.
///
/// A `None` `endpoint` clears the configuration and restores the default agent
/// trace path. This is additive: sessions that never call it are unaffected.
pub fn set_otlp_traces_config(
transport: &mut SidecarTransport,
session_id: String,
endpoint: Option<libdd_common::Endpoint>,
headers: Vec<(String, String)>,
timeout_ms: u64,
) -> io::Result<()> {
lock_sender(transport)?.set_otlp_traces_config(session_id, endpoint, headers, timeout_ms);
Ok(())
}

/// Sends a trace as bytes.
pub fn send_trace_v04_bytes(
transport: &mut SidecarTransport,
Expand Down
14 changes: 14 additions & 0 deletions datadog-sidecar/src/service/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,20 @@ impl SidecarSender {
self.channel.try_send_set_test_session_token(token);
}

pub fn set_otlp_traces_config(
&mut self,
session_id: String,
endpoint: Option<libdd_common::Endpoint>,
headers: Vec<(String, String)>,
timeout_ms: u64,
) {
if !self.try_drain_outbox() {
return;
}
self.channel
.try_send_set_otlp_traces_config(session_id, endpoint, headers, timeout_ms);
}

pub fn add_span_to_concentrator(
&mut self,
env: String,
Expand Down
28 changes: 28 additions & 0 deletions datadog-sidecar/src/service/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ pub(crate) struct SessionInfo {
pub(crate) process_tags: Arc<Mutex<Vec<Tag>>>,
pub(crate) stats_config: Arc<Mutex<Option<crate::service::stats_flusher::StatsConfig>>>,
otlp_metrics_endpoint: Arc<Mutex<Option<Endpoint>>>,
/// Lazily-built OTLP traces exporter, cached per session and reused across
/// flushes so the background `/info` worker is only spawned once. Keyed by a
/// fingerprint of the OTLP traces config so it is rebuilt when the config
/// changes (and cleared when OTLP trace export is disabled). `None` until the
/// first OTLP flush, or whenever OTLP trace export is not configured.
otlp_traces_exporter: Arc<tokio::sync::Mutex<Option<OtlpTracesExporter>>>,
}

/// A cached OTLP traces exporter together with the config fingerprint it was
/// built from, used to detect and rebuild on config changes.
pub(crate) struct OtlpTracesExporter {
pub(crate) fingerprint: String,
pub(crate) exporter: std::sync::Arc<
libdd_data_pipeline::trace_exporter::TraceExporter<
libdd_capabilities_impl::NativeCapabilities,
>,
>,
}

impl SessionInfo {
Expand Down Expand Up @@ -172,6 +189,17 @@ impl SessionInfo {
f(&mut self.get_otlp_metrics_endpoint());
}

/// Returns the cache slot for this session's lazily-built OTLP traces
/// exporter. The trace send path locks this, rebuilds the exporter when the
/// config fingerprint changes, and reuses it otherwise. Because rebuilds are
/// keyed on a config fingerprint, a configuration change is picked up on the
/// next flush without any explicit cache invalidation.
pub(crate) fn otlp_traces_exporter(
&self,
) -> Arc<tokio::sync::Mutex<Option<OtlpTracesExporter>>> {
self.otlp_traces_exporter.clone()
}

pub(crate) fn get_trace_config(&self) -> MutexGuard<'_, tracer::Config> {
self.tracer_config.lock_or_panic()
}
Expand Down
22 changes: 22 additions & 0 deletions datadog-sidecar/src/service/sidecar_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,28 @@ pub trait SidecarInterface {
/// * `process_tags` - The process tags.
async fn set_session_process_tags(process_tags: Vec<Tag>);

/// Sets (or clears) the OTLP traces export configuration for a session.
///
/// This is additive: when an OTLP traces endpoint is configured, the
/// session's traces are exported via libdatadog's OTLP `TraceExporter`
/// instead of the agent msgpack `/v0.4/traces` path. A `None` endpoint
/// clears the configuration and restores the default agent path. The default
/// (no call, or a `None` endpoint) leaves trace export behaviour unchanged.
///
/// # Arguments
///
/// * `session_id` - The ID of the session.
/// * `endpoint` - The full OTLP traces intake endpoint (e.g. `http://host:4318/v1/traces`),
/// resolved by the host language; `None` disables OTLP trace export.
/// * `headers` - Header key/value pairs to attach to OTLP requests.
/// * `timeout_ms` - Request timeout in milliseconds (`0` for the default).
async fn set_otlp_traces_config(
session_id: String,
endpoint: Option<libdd_common::Endpoint>,
headers: Vec<(String, String)>,
timeout_ms: u64,
);

/// Removes the application entry for the given queue ID from the instance.
///
/// # Arguments
Expand Down
Loading
Loading