diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index c5aef5422a..ca4ed29e56 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -744,6 +744,61 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_process_tags( MaybeError::None } +/// Parses a `key1=value1,key2=value2` OTLP header string into key/value pairs. +/// Empty and malformed (missing `=` or empty key) entries are skipped. +fn parse_otlp_traces_headers(raw: &str) -> std::vec::Vec<(String, String)> { + raw.split(',') + .filter_map(|pair| { + let pair = pair.trim(); + if pair.is_empty() { + return None; + } + let (k, v) = pair.split_once('=')?; + let k = k.trim(); + if k.is_empty() { + return None; + } + Some((k.to_string(), v.trim().to_string())) + }) + .collect() +} + +/// Sets the OTLP traces export configuration for an existing session. +/// +/// This is additive and non-breaking: when `otlp_traces_endpoint` is non-null, +/// the session's traces are exported via libdatadog's OTLP `TraceExporter` +/// (HTTP/JSON) instead of the agent msgpack `/v0.4/traces` path. Passing a null +/// `otlp_traces_endpoint` clears the configuration and restores the default +/// agent path. Sessions that never call this function are unaffected. +/// +/// `headers` is the raw `key=value,...` string (e.g. the value of +/// `OTEL_EXPORTER_OTLP_TRACES_HEADERS`); `timeout_ms` of `0` selects the +/// default OTLP request timeout. The endpoint URL is used as-is — the host +/// language is responsible for resolving the full `…/v1/traces` URL. +/// +/// # Safety +/// `otlp_traces_endpoint`, when non-null, must point to a valid `Endpoint`. All +/// `CharSlice` arguments must point to valid, correctly-sized data. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_session_set_otlp_traces_endpoint( + transport: &mut Box, + session_id: ffi::CharSlice, + otlp_traces_endpoint: *const Endpoint, + headers: ffi::CharSlice, + timeout_ms: u64, +) -> MaybeError { + let session_id: String = session_id.to_utf8_lossy().into(); + let endpoint: Option = unsafe { otlp_traces_endpoint.as_ref().cloned() }; + let headers = parse_otlp_traces_headers(&headers.to_utf8_lossy()); + + try_c!(blocking::set_otlp_traces_config( + transport, session_id, endpoint, headers, timeout_ms, + )); + + MaybeError::None +} + #[repr(C)] pub struct TracerHeaderTags<'a> { pub lang: ffi::CharSlice<'a>, @@ -1783,4 +1838,33 @@ mod tests { assert_eq!(endpoint.test_token.as_deref(), Some("metrics-token")); } + + #[test] + fn parse_otlp_traces_headers_basic() { + let parsed = parse_otlp_traces_headers("api-key=abc123,team=apm"); + assert_eq!( + parsed, + vec![ + ("api-key".to_string(), "abc123".to_string()), + ("team".to_string(), "apm".to_string()), + ] + ); + } + + #[test] + fn parse_otlp_traces_headers_trims_and_skips_malformed() { + let parsed = parse_otlp_traces_headers(" k1 = v1 , , bad , k2=v2 "); + assert_eq!( + parsed, + vec![ + ("k1".to_string(), "v1".to_string()), + ("k2".to_string(), "v2".to_string()), + ] + ); + } + + #[test] + fn parse_otlp_traces_headers_empty() { + assert!(parse_otlp_traces_headers("").is_empty()); + } } diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 12a1bd2302..1ec07368e9 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -258,6 +258,21 @@ pub fn set_session_process_tags( Ok(()) } +/// Sets (or clears) the OTLP traces export configuration for a session. +/// +/// A `None` `endpoint` clears the configuration and restores the default agent +/// trace path. This is additive: sessions that never call it are unaffected. +pub fn set_otlp_traces_config( + transport: &mut SidecarTransport, + session_id: String, + endpoint: Option, + headers: Vec<(String, String)>, + timeout_ms: u64, +) -> io::Result<()> { + lock_sender(transport)?.set_otlp_traces_config(session_id, endpoint, headers, timeout_ms); + Ok(()) +} + /// Sends a trace as bytes. pub fn send_trace_v04_bytes( transport: &mut SidecarTransport, diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index e88720e23f..e93004ede8 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -438,6 +438,20 @@ impl SidecarSender { self.channel.try_send_set_test_session_token(token); } + pub fn set_otlp_traces_config( + &mut self, + session_id: String, + endpoint: Option, + headers: Vec<(String, String)>, + timeout_ms: u64, + ) { + if !self.try_drain_outbox() { + return; + } + self.channel + .try_send_set_otlp_traces_config(session_id, endpoint, headers, timeout_ms); + } + pub fn add_span_to_concentrator( &mut self, env: String, diff --git a/datadog-sidecar/src/service/session_info.rs b/datadog-sidecar/src/service/session_info.rs index 8313a41e43..79f320f2d4 100644 --- a/datadog-sidecar/src/service/session_info.rs +++ b/datadog-sidecar/src/service/session_info.rs @@ -47,6 +47,23 @@ pub(crate) struct SessionInfo { pub(crate) process_tags: Arc>>, pub(crate) stats_config: Arc>>, otlp_metrics_endpoint: Arc>>, + /// Lazily-built OTLP traces exporter, cached per session and reused across + /// flushes so the background `/info` worker is only spawned once. Keyed by a + /// fingerprint of the OTLP traces config so it is rebuilt when the config + /// changes (and cleared when OTLP trace export is disabled). `None` until the + /// first OTLP flush, or whenever OTLP trace export is not configured. + otlp_traces_exporter: Arc>>, +} + +/// A cached OTLP traces exporter together with the config fingerprint it was +/// built from, used to detect and rebuild on config changes. +pub(crate) struct OtlpTracesExporter { + pub(crate) fingerprint: String, + pub(crate) exporter: std::sync::Arc< + libdd_data_pipeline::trace_exporter::TraceExporter< + libdd_capabilities_impl::NativeCapabilities, + >, + >, } impl SessionInfo { @@ -172,6 +189,17 @@ impl SessionInfo { f(&mut self.get_otlp_metrics_endpoint()); } + /// Returns the cache slot for this session's lazily-built OTLP traces + /// exporter. The trace send path locks this, rebuilds the exporter when the + /// config fingerprint changes, and reuses it otherwise. Because rebuilds are + /// keyed on a config fingerprint, a configuration change is picked up on the + /// next flush without any explicit cache invalidation. + pub(crate) fn otlp_traces_exporter( + &self, + ) -> Arc>> { + self.otlp_traces_exporter.clone() + } + pub(crate) fn get_trace_config(&self) -> MutexGuard<'_, tracer::Config> { self.tracer_config.lock_or_panic() } diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index 6f6244b258..ac70d84eec 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -69,6 +69,28 @@ pub trait SidecarInterface { /// * `process_tags` - The process tags. async fn set_session_process_tags(process_tags: Vec); + /// Sets (or clears) the OTLP traces export configuration for a session. + /// + /// This is additive: when an OTLP traces endpoint is configured, the + /// session's traces are exported via libdatadog's OTLP `TraceExporter` + /// instead of the agent msgpack `/v0.4/traces` path. A `None` endpoint + /// clears the configuration and restores the default agent path. The default + /// (no call, or a `None` endpoint) leaves trace export behaviour unchanged. + /// + /// # Arguments + /// + /// * `session_id` - The ID of the session. + /// * `endpoint` - The full OTLP traces intake endpoint (e.g. `http://host:4318/v1/traces`), + /// resolved by the host language; `None` disables OTLP trace export. + /// * `headers` - Header key/value pairs to attach to OTLP requests. + /// * `timeout_ms` - Request timeout in milliseconds (`0` for the default). + async fn set_otlp_traces_config( + session_id: String, + endpoint: Option, + headers: Vec<(String, String)>, + timeout_ms: u64, + ); + /// Removes the application entry for the given queue ID from the instance. /// /// # Arguments diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 218e7f7d05..cdb504d46e 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -38,6 +38,7 @@ use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::ffe_exposures_flusher; use crate::service::ffe_metrics_flusher; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; +use crate::service::session_info::OtlpTracesExporter; use crate::service::stats_flusher::{ flush_all_stats_now, get_or_create_concentrator, stats_endpoint, ConcentratorKey, SpanConcentratorState, StatsConfig, @@ -48,6 +49,7 @@ use datadog_live_debugger::sender::{agent_info_supports_debugger_v2_endpoint, De use datadog_remote_config::fetch::{ConfigInvariants, ConfigOptions, MultiTargetStats}; use libdd_capabilities_impl::NativeCapabilities; use libdd_common::tag::Tag; +use libdd_data_pipeline::trace_exporter::TraceExporterBuilder; use libdd_dogstatsd_client::{new, DogStatsDActionOwned}; use libdd_telemetry::config::Config; use libdd_tinybytes as tinybytes; @@ -242,6 +244,128 @@ impl SidecarServer { debug!("Successfully shut down session: {}", session_id); } + /// Exports a session's v0.4 msgpack trace payload via the OTLP HTTP/JSON + /// `TraceExporter`. + /// + /// The exporter is built lazily and cached on the session (keyed by a + /// fingerprint of the OTLP traces config) so the per-exporter background + /// `/info` worker is only spawned once and reused across flushes; it is + /// rebuilt automatically when the config changes. The exporter's `send` + /// decodes the v0.4 payload, drops unsampled (p0) chunks itself, maps the + /// remaining chunks to OTLP using the session resource metadata, and POSTs + /// them to the configured endpoint. + async fn send_trace_otlp(session: &SessionInfo, data: &[u8]) { + // Snapshot the OTLP config (and resource metadata) under the lock, then + // release it before any await. + let (endpoint, headers, timeout_ms, language, language_version, tracer_version) = { + let cfg = session.get_trace_config(); + let Some(endpoint) = cfg.otlp_traces_endpoint.clone() else { + // Config was cleared between the caller's check and here. + return; + }; + ( + endpoint, + cfg.otlp_traces_headers.clone(), + cfg.otlp_traces_timeout_ms, + cfg.language.clone(), + cfg.language_version.clone(), + cfg.tracer_version.clone(), + ) + }; + + // Resource attributes: reuse the session's stats/service metadata. + let (service, hostname, process_tags) = { + let stats = session.stats_config.lock_or_panic(); + match &*stats { + Some(s) => ( + s.root_service.clone(), + s.hostname.clone(), + s.process_tags.clone(), + ), + None => (String::new(), String::new(), String::new()), + } + }; + + // Fingerprint the inputs that affect exporter construction so the cache + // is rebuilt only when something relevant changes. + let fingerprint = format!( + "{}|{}|{}|{}|{}|{}|{}", + endpoint.url, + endpoint.test_token.as_deref().unwrap_or(""), + timeout_ms, + service, + language, + language_version, + tracer_version, + ); + let fingerprint = headers.iter().fold(fingerprint, |mut acc, (k, v)| { + acc.push('|'); + acc.push_str(k); + acc.push('='); + acc.push_str(v); + acc + }); + + let cache = session.otlp_traces_exporter(); + let mut guard = cache.lock().await; + let needs_build = guard + .as_ref() + .map(|e| e.fingerprint != fingerprint) + .unwrap_or(true); + if needs_build { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint(&endpoint.url.to_string()) + .set_otlp_headers(headers) + .set_connection_timeout(if timeout_ms == 0 { + None + } else { + Some(timeout_ms) + }) + .set_service(&service) + .set_hostname(&hostname) + .set_process_tags(&process_tags) + .set_language(&language) + .set_language_version(&language_version) + .set_tracer_version(&tracer_version); + // NOTE: env / app_version are deliberately NOT set at the exporter level here. + // Unlike `service`, the sidecar has no reliable session-level env/version (they + // arrive per-runtime via `set_universal_service_tags` and are dynamic per-trace). + // Instead the OTLP encoder derives the resource attributes + // `deployment.environment.name` / `service.version` from the `env` / `version` + // meta tags carried on each exported trace's spans (see + // `libdd-trace-utils/src/otlp_encoder/mapper.rs`), which is correct per-trace even + // for a multi-service session. Setting them here would be empty and pointless. + if let Some(token) = endpoint.test_token.as_deref() { + builder.set_test_session_token(token); + } + match builder.build_async::().await { + Ok(exporter) => { + *guard = Some(OtlpTracesExporter { + fingerprint, + exporter: Arc::new(exporter), + }); + } + Err(e) => { + error!("Failed to build OTLP traces exporter: {e:?}"); + return; + } + } + } + + // Clone the cached exporter Arc so we can drop the cache lock before the + // (potentially slow) network send. + let Some(exporter) = guard.as_ref().map(|e| e.exporter.clone()) else { + return; + }; + drop(guard); + + match exporter.send_async(data).await { + Ok(_) => debug!("Successfully exported traces via OTLP to {}", endpoint.url), + Err(e) => error!("Error exporting traces via OTLP: {e:?}"), + } + } + fn send_trace_v04( &self, headers: &SerializedTracerHeaderTags, @@ -824,6 +948,29 @@ impl SidecarInterface for ConnectionSidecarHandler { *session.process_tags.lock_or_panic() = process_tags; } + async fn set_otlp_traces_config( + &self, + _peer: PeerCredentials, + session_id: String, + endpoint: Option, + headers: Vec<(String, String)>, + timeout_ms: u64, + ) { + let session = self.server.get_session(&session_id); + debug!( + "Setting OTLP traces config for session {session_id}: endpoint={:?}, {} header(s), timeout_ms={timeout_ms}", + endpoint.as_ref().map(|e| e.url.to_string()), + headers.len() + ); + // Store the resolved OTLP traces config onto the per-session trace + // config. The send path rebuilds its cached OTLP exporter when this + // config's fingerprint changes (and skips OTLP entirely when the + // endpoint is None), so no explicit cache invalidation is needed here. + session.modify_trace_config(|cfg| { + cfg.set_otlp_traces_endpoint(endpoint, headers, timeout_ms); + }); + } + async fn shutdown_runtime(&self, _peer: PeerCredentials, instance_id: InstanceId) { let session = self.server.get_session(&instance_id.session_id); tokio::spawn(async move { session.shutdown_runtime(&instance_id.runtime_id).await }); @@ -844,13 +991,30 @@ impl SidecarInterface for ConnectionSidecarHandler { headers: SerializedTracerHeaderTags, ) { self.track_instance(&instance_id); - if let Some(endpoint) = self - .server - .get_session(&instance_id.session_id) - .get_trace_config() - .endpoint - .clone() - { + let session = self.server.get_session(&instance_id.session_id); + + // Snapshot the routing decision while holding the config lock, then drop + // the guard before any await or spawn. + let (otlp_enabled, agent_endpoint) = { + let cfg = session.get_trace_config(); + (cfg.otlp_traces_endpoint.is_some(), cfg.endpoint.clone()) + }; + + // OTLP path takes precedence when an OTLP traces endpoint is configured + // for this session; otherwise fall back to the unchanged agent path. + if otlp_enabled { + tokio::spawn(async move { + match handle.map() { + Ok(mapped) => { + SidecarServer::send_trace_otlp(&session, mapped.as_slice()).await; + } + Err(e) => error!("Failed mapping shared trace data memory: {}", e), + } + }); + return; + } + + if let Some(endpoint) = agent_endpoint { let server = self.server.clone(); tokio::spawn(async move { match handle.map() { @@ -877,13 +1041,25 @@ impl SidecarInterface for ConnectionSidecarHandler { headers: SerializedTracerHeaderTags, ) { self.track_instance(&instance_id); - if let Some(endpoint) = self - .server - .get_session(&instance_id.session_id) - .get_trace_config() - .endpoint - .clone() - { + let session = self.server.get_session(&instance_id.session_id); + + // Snapshot the routing decision while holding the config lock, then drop + // the guard before any await or spawn. + let (otlp_enabled, agent_endpoint) = { + let cfg = session.get_trace_config(); + (cfg.otlp_traces_endpoint.is_some(), cfg.endpoint.clone()) + }; + + // OTLP path takes precedence when an OTLP traces endpoint is configured + // for this session; otherwise fall back to the unchanged agent path. + if otlp_enabled { + tokio::spawn(async move { + SidecarServer::send_trace_otlp(&session, &data).await; + }); + return; + } + + if let Some(endpoint) = agent_endpoint { let server = self.server.clone(); tokio::spawn(async move { let bytes = tinybytes::Bytes::from(data); diff --git a/datadog-sidecar/src/tracer.rs b/datadog-sidecar/src/tracer.rs index 07deabf078..a2a49b056f 100644 --- a/datadog-sidecar/src/tracer.rs +++ b/datadog-sidecar/src/tracer.rs @@ -32,6 +32,18 @@ pub struct Config { pub language: String, pub language_version: String, pub tracer_version: String, + /// Optional OTLP traces intake endpoint, used as-is (e.g. + /// `http://host:4318/v1/traces`). When set, traces for this session are + /// exported via libdatadog's OTLP `TraceExporter` instead of the agent + /// msgpack `/v0.4/traces` path. Resolved by the host language (e.g. from + /// `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before being forwarded here. + pub otlp_traces_endpoint: Option, + /// Headers to attach to OTLP trace export requests, parsed by the host + /// language from `OTEL_EXPORTER_OTLP_TRACES_HEADERS`. + pub otlp_traces_headers: Vec<(String, String)>, + /// OTLP trace export request timeout in milliseconds + /// (`OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`). `0` means "use the default". + pub otlp_traces_timeout_ms: u64, } impl Config { @@ -55,9 +67,72 @@ impl Config { endpoint.test_token = test_token.map(|t| t.into()); } } + + /// Sets the OTLP traces export configuration for this session. The endpoint + /// URL is stored as-is (the host language resolves the full + /// `…/v1/traces` URL). A `None` endpoint disables OTLP trace export and + /// restores the default agent msgpack path. + pub fn set_otlp_traces_endpoint( + &mut self, + endpoint: Option, + headers: Vec<(String, String)>, + timeout_ms: u64, + ) { + self.otlp_traces_endpoint = endpoint; + self.otlp_traces_headers = headers; + self.otlp_traces_timeout_ms = timeout_ms; + } } pub fn shm_limiter_path() -> CString { #[allow(clippy::unwrap_used)] CString::new(format!("/ddlimiters-{}", primary_sidecar_identifier())).unwrap() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn otlp_traces_endpoint_defaults_to_none() { + let cfg = Config::default(); + assert!(cfg.otlp_traces_endpoint.is_none()); + assert!(cfg.otlp_traces_headers.is_empty()); + assert_eq!(cfg.otlp_traces_timeout_ms, 0); + } + + #[test] + fn set_otlp_traces_endpoint_stores_url_as_is() { + let mut cfg = Config::default(); + let endpoint = Endpoint::from_slice("http://collector:4318/v1/traces"); + cfg.set_otlp_traces_endpoint( + Some(endpoint), + vec![("api-key".to_string(), "secret".to_string())], + 5000, + ); + + let stored = cfg.otlp_traces_endpoint.expect("endpoint should be set"); + // The traces endpoint is used verbatim (unlike `set_endpoint`, which + // rewrites the path to /v0.4/traces for the agent). + assert_eq!(stored.url.to_string(), "http://collector:4318/v1/traces"); + assert_eq!( + cfg.otlp_traces_headers, + vec![("api-key".to_string(), "secret".to_string())] + ); + assert_eq!(cfg.otlp_traces_timeout_ms, 5000); + } + + #[test] + fn set_otlp_traces_endpoint_none_disables_export() { + let mut cfg = Config::default(); + cfg.set_otlp_traces_endpoint( + Some(Endpoint::from_slice("http://collector:4318/v1/traces")), + vec![], + 1000, + ); + assert!(cfg.otlp_traces_endpoint.is_some()); + + cfg.set_otlp_traces_endpoint(None, vec![], 0); + assert!(cfg.otlp_traces_endpoint.is_none()); + } +} diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index 0575c20ccf..8d652e248b 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -10,7 +10,7 @@ use super::json_types::{ use super::OtlpResourceInfo; use crate::span::v04::{Span, SpanEvent, SpanLink}; use crate::span::TraceData; -use std::borrow::Borrow; +use std::borrow::{Borrow, Cow}; /// Maximum number of attributes per span; excess are dropped and counted. const MAX_ATTRIBUTES_PER_SPAN: usize = 128; @@ -31,6 +31,29 @@ pub fn map_traces_to_otlp( trace_chunks: Vec>>, resource_info: &OtlpResourceInfo, ) -> ExportTraceServiceRequest { + // When the caller did not supply env / app_version at the exporter level (e.g. the PHP + // sidecar path, where these are dynamic per-trace rather than fixed exporter config), + // fall back to the `env` / `version` meta tags carried on the trace's spans. This is a + // fallback only: callers that already set env / app_version (dd-trace-rs, dd-trace-py) + // keep their exporter-level values untouched. + let resource_info: Cow<'_, OtlpResourceInfo> = + if resource_info.env.is_empty() || resource_info.app_version.is_empty() { + let mut owned = resource_info.clone(); + if owned.env.is_empty() { + if let Some(env) = search_chunks_for_meta(&trace_chunks, "env") { + owned.env = env; + } + } + if owned.app_version.is_empty() { + if let Some(version) = search_chunks_for_meta(&trace_chunks, "version") { + owned.app_version = version; + } + } + Cow::Owned(owned) + } else { + Cow::Borrowed(resource_info) + }; + let resource_info = resource_info.as_ref(); let resource = build_resource(resource_info); let mut all_spans: Vec = Vec::new(); for chunk in &trace_chunks { @@ -67,6 +90,42 @@ pub fn map_traces_to_otlp( } } +/// Returns the first non-empty value of meta `key` across all trace chunks, preferring the +/// chunk-root span (`parent_id == 0`) of each chunk before scanning the remaining spans. +/// +/// Used to recover resource-level `env` / `version` for export paths (e.g. the sidecar) that do +/// not carry these as fixed exporter configuration but do tag every span with them. +fn search_chunks_for_meta( + trace_chunks: &[Vec>], + key: &str, +) -> Option { + // First pass: chunk-root spans, which carry the authoritative service-entry metadata. + for chunk in trace_chunks { + for span in chunk { + if span.parent_id == 0 { + if let Some(v) = span.meta.get(key) { + let v = v.borrow(); + if !v.is_empty() { + return Some(v.to_string()); + } + } + } + } + } + // Second pass: any span, in case no root span is present in the payload. + for chunk in trace_chunks { + for span in chunk { + if let Some(v) = span.meta.get(key) { + let v = v.borrow(); + if !v.is_empty() { + return Some(v.to_string()); + } + } + } + } + None +} + fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { let mut attributes: Vec = Vec::new(); if !resource_info.service.is_empty() { @@ -905,4 +964,155 @@ mod tests { let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; assert_eq!(otlp_span.flags, Some(0)); } + + /// Returns the string value of a resource-level attribute, or None if absent. + fn resource_attr(req: &ExportTraceServiceRequest, key: &str) -> Option { + let json = serde_json::to_value(req).unwrap(); + json["resourceSpans"][0]["resource"]["attributes"] + .as_array() + .unwrap() + .iter() + .find(|a| a["key"] == key) + .map(|a| a["value"]["stringValue"].as_str().unwrap().to_string()) + } + + #[test] + fn test_env_version_fallback_from_span_meta() { + // Sidecar path: env / app_version are not set at the exporter level, but every span + // carries `env` / `version` meta. They must surface as resource attributes + // deployment.environment.name / service.version (the system-test requirement). + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let mut span: Span = Span { + trace_id: 1, + span_id: 2, + parent_id: 0, + name: libdd_tinybytes::BytesString::from_static("root"), + service: libdd_tinybytes::BytesString::from_static("svc"), + start: 0, + duration: 1, + ..Default::default() + }; + span.meta.insert( + libdd_tinybytes::BytesString::from_static("env"), + libdd_tinybytes::BytesString::from_static("system-tests"), + ); + span.meta.insert( + libdd_tinybytes::BytesString::from_static("version"), + libdd_tinybytes::BytesString::from_static("1.0.0"), + ); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + assert_eq!( + resource_attr(&req, "deployment.environment.name").as_deref(), + Some("system-tests") + ); + assert_eq!( + resource_attr(&req, "service.version").as_deref(), + Some("1.0.0") + ); + } + + #[test] + fn test_exporter_env_version_take_precedence_over_meta() { + // When env / app_version are set at the exporter level (dd-trace-rs / dd-trace-py), + // span meta must NOT override them. + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + env: "exporter-env".to_string(), + app_version: "9.9.9".to_string(), + ..Default::default() + }; + let mut span: Span = Span { + trace_id: 1, + span_id: 2, + parent_id: 0, + name: libdd_tinybytes::BytesString::from_static("root"), + start: 0, + duration: 1, + ..Default::default() + }; + span.meta.insert( + libdd_tinybytes::BytesString::from_static("env"), + libdd_tinybytes::BytesString::from_static("meta-env"), + ); + span.meta.insert( + libdd_tinybytes::BytesString::from_static("version"), + libdd_tinybytes::BytesString::from_static("1.0.0"), + ); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + assert_eq!( + resource_attr(&req, "deployment.environment.name").as_deref(), + Some("exporter-env") + ); + assert_eq!( + resource_attr(&req, "service.version").as_deref(), + Some("9.9.9") + ); + } + + #[test] + fn test_env_version_fallback_prefers_root_span() { + // The chunk-root span (parent_id == 0) is the authoritative source for env / version; + // a child span with conflicting meta must not win. + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let mut child: Span = Span { + trace_id: 1, + span_id: 3, + parent_id: 2, + name: libdd_tinybytes::BytesString::from_static("child"), + start: 0, + duration: 1, + ..Default::default() + }; + child.meta.insert( + libdd_tinybytes::BytesString::from_static("env"), + libdd_tinybytes::BytesString::from_static("child-env"), + ); + let mut root: Span = Span { + trace_id: 1, + span_id: 2, + parent_id: 0, + name: libdd_tinybytes::BytesString::from_static("root"), + start: 0, + duration: 1, + ..Default::default() + }; + root.meta.insert( + libdd_tinybytes::BytesString::from_static("env"), + libdd_tinybytes::BytesString::from_static("root-env"), + ); + // child first to ensure ordering does not decide the result + let req = map_traces_to_otlp(vec![vec![child, root]], &resource_info); + assert_eq!( + resource_attr(&req, "deployment.environment.name").as_deref(), + Some("root-env") + ); + } + + #[test] + fn test_no_env_version_meta_omits_resource_attrs() { + // No exporter-level env / version and no span meta: the resource attributes must be + // omitted entirely (default behavior unchanged). + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let span: Span = Span { + trace_id: 1, + span_id: 2, + parent_id: 0, + name: libdd_tinybytes::BytesString::from_static("root"), + start: 0, + duration: 1, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + assert_eq!(resource_attr(&req, "deployment.environment.name"), None); + assert_eq!(resource_attr(&req, "service.version"), None); + } }