diff --git a/Cargo.lock b/Cargo.lock index fff06dbaa3..333063960c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3006,6 +3006,7 @@ dependencies = [ "libdd-trace-protobuf", "libdd-trace-stats", "libdd-trace-utils", + "prost", "rand 0.8.5", "regex", "rmp-serde", @@ -3455,6 +3456,7 @@ dependencies = [ "flate2", "futures", "getrandom 0.2.15", + "hex", "http", "http-body", "http-body-util", diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 5271c86e63..409ad0e2f2 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -83,6 +83,7 @@ pub struct TraceExporterConfig { connection_timeout: Option, shared_runtime: Option>, otlp_endpoint: Option, + otlp_protocol: Option, } #[no_mangle] @@ -498,6 +499,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( ) } +/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or +/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value +/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`). +/// +/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted +/// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( + config: Option<&mut TraceExporterConfig>, + protocol: CharSlice, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + let value = match sanitize_string(protocol) { + Ok(s) => s, + Err(e) => return Some(e), + }; + match value.as_str() { + "http/json" | "http/protobuf" => { + handle.otlp_protocol = Some(value); + None + } + _ => gen_error!(ErrorCode::InvalidArgument), + } + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Create a new TraceExporter instance. /// /// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in @@ -565,6 +597,13 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( if let Some(ref url) = config.otlp_endpoint { builder.set_otlp_endpoint(url); + if let Some(ref proto) = config.otlp_protocol { + // The FFI setter only stores "http/json"/"http/protobuf", so this parse always + // succeeds here; a parse failure just leaves the builder's default protocol. + if let Ok(p) = proto.parse::() { + builder.set_otlp_protocol(p); + } + } } match builder.build() { @@ -1283,6 +1322,69 @@ mod tests { } } + #[test] + fn config_otlp_protocol_test() { + unsafe { + // Null config → InvalidArgument + let error = + ddog_trace_exporter_config_set_otlp_protocol(None, CharSlice::from("http/json")); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // "http/json" → success, stored + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("http/json"), + ); + assert_eq!(error, None); + assert_eq!( + config.as_ref().unwrap().otlp_protocol.as_deref(), + Some("http/json") + ); + + // "http/protobuf" → success, stored + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("http/protobuf"), + ); + assert_eq!(error, None); + assert_eq!( + config.as_ref().unwrap().otlp_protocol.as_deref(), + Some("http/protobuf") + ); + + // "grpc" → InvalidArgument + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("grpc"), + ); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // Garbage value → InvalidArgument + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("nonsense"), + ); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // Non-UTF-8 input → InvalidInput + let mut config = Some(TraceExporterConfig::default()); + let invalid: [u8; 2] = [0x80u8, 0xFFu8]; + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from_bytes(&invalid), + ); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidInput); + ddog_trace_exporter_error_free(error); + } + } + #[cfg(all(feature = "catch_panic", panic = "unwind"))] #[test] fn catch_panic_test() { diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index bb93a10a59..85681e902f 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -73,6 +73,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils", features = [ "test-utils", ] } httpmock = "0.8.0-alpha.1" +prost = "0.14.1" rand = "0.8.5" tempfile = "3.3.0" tokio = { version = "1.23", features = [ diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 2b9955ce3d..f34613bd51 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -13,7 +13,7 @@ pub mod agent_info; mod health_metrics; -pub(crate) mod otlp; +pub mod otlp; #[cfg(feature = "telemetry")] pub(crate) mod telemetry; #[cfg(not(target_arch = "wasm32"))] diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 02d7a45f80..e48f3961bd 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -6,20 +6,31 @@ use http::HeaderMap; use std::time::Duration; -/// OTLP trace export protocol. HTTP/JSON is currently supported. +/// OTLP trace export protocol. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub(crate) enum OtlpProtocol { +pub enum OtlpProtocol { /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. #[default] HttpJson, - /// HTTP with protobuf body. (Not supported yet) - #[allow(dead_code)] + /// HTTP with protobuf body (Content-Type: application/x-protobuf). HttpProtobuf, - /// gRPC. (Not supported yet) - #[allow(dead_code)] + /// gRPC. Parsed by `FromStr` so callers get a clean error, but rejected at export time + /// (unsupported). Grpc, } +impl std::str::FromStr for OtlpProtocol { + type Err = String; + fn from_str(s: &str) -> Result { + match s { + "http/json" => Ok(OtlpProtocol::HttpJson), + "http/protobuf" => Ok(OtlpProtocol::HttpProtobuf), + "grpc" => Ok(OtlpProtocol::Grpc), + other => Err(format!("unknown OTLP protocol: {other}")), + } + } +} + /// Default timeout for OTLP export requests. pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10); @@ -32,7 +43,25 @@ pub struct OtlpTraceConfig { pub headers: HeaderMap, /// Request timeout. pub timeout: Duration, - /// Protocol (for future use; currently only HttpJson is supported). - #[allow(dead_code)] - pub(crate) protocol: OtlpProtocol, + /// OTLP export protocol (selects body encoding and content-type). + pub protocol: OtlpProtocol, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + #[test] + fn protocol_from_str() { + assert_eq!( + OtlpProtocol::from_str("http/json").unwrap(), + OtlpProtocol::HttpJson + ); + assert_eq!( + OtlpProtocol::from_str("http/protobuf").unwrap(), + OtlpProtocol::HttpProtobuf + ); + assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); + assert!(OtlpProtocol::from_str("nonsense").is_err()); + } } diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 1f4d86a235..ba7ccef060 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -16,7 +16,9 @@ const OTLP_MAX_RETRIES: u32 = 4; /// Initial backoff between retries (milliseconds). const OTLP_RETRY_DELAY_MS: u64 = 100; -/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. +/// Send an OTLP trace payload to the configured endpoint with retries. +/// +/// The body encoding and `Content-Type` are selected from `config.protocol`. /// /// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. /// @@ -26,7 +28,7 @@ pub async fn send_otlp_traces_http( capabilities: &C, config: &OtlpTraceConfig, test_token: Option<&str>, - json_body: Vec, + body: Vec, ) -> Result<(), TraceExporterError> { let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( @@ -41,11 +43,19 @@ pub async fn send_otlp_traces_http( ..Endpoint::default() }; + // `Grpc` is rejected earlier in `send_otlp_traces_inner` and never reaches this function, so it + // is grouped with the JSON content-type here only to keep the match exhaustive. + let content_type = match config.protocol { + crate::otlp::config::OtlpProtocol::HttpProtobuf => { + libdd_common::header::APPLICATION_PROTOBUF + } + crate::otlp::config::OtlpProtocol::HttpJson | crate::otlp::config::OtlpProtocol::Grpc => { + libdd_common::header::APPLICATION_JSON + } + }; + let mut headers = config.headers.clone(); - headers.insert( - http::header::CONTENT_TYPE, - libdd_common::header::APPLICATION_JSON, - ); + headers.insert(http::header::CONTENT_TYPE, content_type); if let Some(token) = test_token { if let Ok(val) = http::HeaderValue::from_str(token) { headers.insert( @@ -62,7 +72,7 @@ pub async fn send_otlp_traces_http( None, ); - match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await { + match send_with_retry(capabilities, &target, body, &headers, &retry_strategy).await { Ok(_) => Ok(()), Err(e) => Err(map_send_error(e).await), } diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 658fc13b87..adde33396b 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -5,8 +5,9 @@ //! //! When an OTLP endpoint is configured via //! [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], the trace exporter sends -//! traces in OTLP HTTP/JSON format to that endpoint instead of the Datadog agent. The host language -//! is responsible for resolving the endpoint from its own configuration (e.g. +//! traces in OTLP HTTP format to that endpoint instead of the Datadog agent; the wire encoding +//! (JSON or protobuf) is selected via [`OtlpProtocol`]. The host language is responsible for +//! resolving the endpoint from its own configuration (e.g. //! `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`). //! //! ## Sampling @@ -22,9 +23,9 @@ //! spans from a local trace are closed (i.e. send complete trace chunks). This crate does not //! buffer or flush partially—it exports whatever trace chunks it receives. -pub mod config; -pub mod exporter; +pub(crate) mod config; +pub(crate) mod exporter; -pub use config::OtlpTraceConfig; -pub use exporter::send_otlp_traces_http; +pub use config::{OtlpProtocol, OtlpTraceConfig}; +pub(crate) use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 3c0e1f14b5..1bccb8c04c 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -65,6 +65,7 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + otlp_protocol: OtlpProtocol, } impl TraceExporterBuilder { @@ -286,6 +287,17 @@ impl TraceExporterBuilder { self } + /// Selects the OTLP export protocol. Accepts `OtlpProtocol::HttpJson` (default) or + /// `OtlpProtocol::HttpProtobuf`. The host language resolves this from + /// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`. + /// + /// `OtlpProtocol::Grpc` is not supported; selecting it makes [`build`](Self::build) / + /// [`build_async`](Self::build_async) fail with [`BuilderErrorKind::InvalidConfiguration`]. + pub fn set_otlp_protocol(&mut self, protocol: OtlpProtocol) -> &mut Self { + self.otlp_protocol = protocol; + self + } + /// Sets additional HTTP headers to include in OTLP trace export requests. /// /// Headers should be provided as key-value pairs. The host language is responsible for @@ -332,6 +344,18 @@ impl TraceExporterBuilder { )); } + // OTLP gRPC export is not implemented. Reject it here so a misconfigured exporter fails + // fast at build time with a clear `InvalidConfiguration` (FFI: `InvalidArgument`), matching + // the C FFI `set_otlp_protocol` setter, rather than erroring on every send. The send-time + // arm in `send_otlp_traces_inner` remains as a defensive guard. + if self.otlp_protocol == OtlpProtocol::Grpc { + return Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration( + "OTLP gRPC export is not supported".to_string(), + ), + )); + } + let shared_runtime = match self.shared_runtime { Some(rt) => rt, None => Self::new_shared_runtime()?, @@ -451,7 +475,7 @@ impl TraceExporterBuilder { .connection_timeout .map(Duration::from_millis) .unwrap_or(DEFAULT_OTLP_TIMEOUT), - protocol: OtlpProtocol::HttpJson, + protocol: self.otlp_protocol, } }); @@ -672,6 +696,21 @@ mod tests { )); } + #[test] + fn test_otlp_grpc_protocol_rejected_at_build() { + // gRPC is unsupported and must fail fast at build time (not on the first send), with the + // same `InvalidConfiguration` category the C FFI setter uses. + let mut builder = TraceExporterBuilder::default(); + builder.set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!(matches!( + result, + Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration(_) + )) + )); + } + #[cfg_attr(miri, ignore)] #[test] fn test_build_with_v1_starts_inactive() { diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 070fc754e0..10d5ac95f1 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -15,7 +15,9 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; -use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; +use crate::otlp::{ + map_traces_to_otlp, send_otlp_traces_http, OtlpProtocol, OtlpResourceInfo, OtlpTraceConfig, +}; #[cfg(feature = "telemetry")] use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ @@ -546,15 +548,28 @@ impl Tra r }; let request = map_traces_to_otlp(traces, &resource_info); - let json_body = serde_json::to_vec(&request).map_err(|e| { - error!("OTLP JSON serialization error: {e}"); - TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) - })?; + let body = match config.protocol { + OtlpProtocol::HttpJson => libdd_trace_utils::otlp_encoder::encode_otlp_json(&request) + .map_err(|e| { + error!("OTLP JSON serialization error: {e}"); + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) + })?, + OtlpProtocol::HttpProtobuf => { + libdd_trace_utils::otlp_encoder::encode_otlp_protobuf(&request) + } + OtlpProtocol::Grpc => { + return Err(TraceExporterError::Internal( + InternalErrorKind::InvalidWorkerState( + "OTLP gRPC export is not supported".to_string(), + ), + )); + } + }; send_otlp_traces_http( &self.capabilities, config, self.endpoint.test_token.as_deref(), - json_body, + body, ) .await?; Ok(AgentResponse::Unchanged) diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_protobuf_export.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_protobuf_export.rs new file mode 100644 index 0000000000..2d193ed387 --- /dev/null +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_protobuf_export.rs @@ -0,0 +1,86 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 +#[cfg(test)] +mod otlp_protobuf_tests { + use libdd_capabilities_impl::NativeCapabilities; + use libdd_data_pipeline::trace_exporter::TraceExporterBuilder; + use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; + use libdd_trace_utils::test_utils::create_test_json_span; + use prost::Message; + use serde_json::json; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use tokio::task; + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn otlp_protobuf_export_sends_decodable_payload() { + use httpmock::MockServer; + + // The httpmock 0.8 alpha API does not expose captured request bodies after the fact, so + // we decode and validate the protobuf body inside a custom request matcher. The matcher + // flips `body_valid` when the payload decodes and carries the expected service.name. + let body_valid = Arc::new(AtomicBool::new(false)); + let matcher_flag = body_valid.clone(); + + let server = MockServer::start_async().await; + let mock = server + .mock_async(move |when, then| { + let flag = matcher_flag.clone(); + when.method("POST") + .path("/v1/traces") + .header("content-type", "application/x-protobuf") + .is_true(move |req: &httpmock::prelude::HttpMockRequest| { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value; + let Ok(decoded) = ExportTraceServiceRequest::decode(req.body_ref()) else { + return false; + }; + let valid = decoded + .resource_spans + .first() + .and_then(|rs| rs.resource.as_ref()) + .map(|resource| { + resource.attributes.iter().any(|kv| { + kv.key == "service.name" + && matches!( + kv.value.as_ref().and_then(|v| v.value.as_ref()), + Some(Value::StringValue(s)) if s == "test" + ) + }) + }) + .unwrap_or(false); + if valid { + flag.store(true, Ordering::SeqCst); + } + valid + }); + then.status(200).body(""); + }) + .await; + + let endpoint = format!("http://localhost:{}/v1/traces", server.port()); + let task_result = task::spawn_blocking(move || { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint(&endpoint) + .set_otlp_protocol(libdd_data_pipeline::otlp::OtlpProtocol::HttpProtobuf) + .set_language("test-lang") + .set_tracer_version("1.0") + .set_env("test_env") + .set_service("test"); + let exporter = builder.build::().expect("build"); + let mut span = create_test_json_span(1234, 12342, 12341, 1, false); + span["name"] = json!("pb_span"); + let data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + exporter.send(data.as_ref()).expect("send ok"); + }) + .await; + + assert!(task_result.is_ok()); + assert_eq!(mock.calls_async().await, 1); + assert!( + body_valid.load(Ordering::SeqCst), + "protobuf body did not decode to the expected ExportTraceServiceRequest" + ); + } +} diff --git a/libdd-trace-protobuf/build.rs b/libdd-trace-protobuf/build.rs index c9c891a681..aee2ebd246 100644 --- a/libdd-trace-protobuf/build.rs +++ b/libdd-trace-protobuf/build.rs @@ -36,6 +36,15 @@ fn generate_protobuf() { config.out_dir(output_path.clone()); + // The vendored OpenTelemetry trace protos carry doc comments with indented example blocks + // (e.g. on `Span.attributes`) that rustdoc would interpret as Rust doctests and fail to + // compile. Drop the generated comments for these packages; the vendored `.proto` files remain + // the documentation source of truth. + config.disable_comments([ + ".opentelemetry.proto.trace.v1", + ".opentelemetry.proto.collector.trace.v1", + ]); + // The following prost_build config changes modify the protobuf generated structs in // in the following ways: @@ -62,9 +71,14 @@ fn generate_protobuf() { config.field_attribute(".pb.SpanLink.tracestate", "#[serde(default)]"); config.field_attribute(".pb.SpanLink.flags", "#[serde(default)]"); - config.type_attribute("Span", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("pb.Span", "#[derive(Deserialize, Serialize)]"); config.type_attribute( - "Span", + "pb.Span", + r#"#[cfg_attr(feature = "fuzzing", derive(bolero::TypeGenerator))]"#, + ); + config.type_attribute("pb.idx.Span", "#[derive(Deserialize, Serialize)]"); + config.type_attribute( + "pb.idx.Span", r#"#[cfg_attr(feature = "fuzzing", derive(bolero::TypeGenerator))]"#, ); config.field_attribute( @@ -319,6 +333,8 @@ fn generate_protobuf() { "src/pb/stats.proto", "src/pb/remoteconfig.proto", "src/pb/opentelemetry/proto/common/v1/process_context.proto", + "src/pb/opentelemetry/proto/trace/v1/trace.proto", + "src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto", "src/pb/idx/tracer_payload.proto", "src/pb/idx/span.proto", ], @@ -363,6 +379,14 @@ fn generate_protobuf() { otel_license, &output_path.join("opentelemetry.proto.common.v1.rs"), ); + prepend_to_file( + otel_license, + &output_path.join("opentelemetry.proto.trace.v1.rs"), + ); + prepend_to_file( + otel_license, + &output_path.join("opentelemetry.proto.collector.trace.v1.rs"), + ); } #[cfg(feature = "generate-protobuf")] diff --git a/libdd-trace-protobuf/src/_includes.rs b/libdd-trace-protobuf/src/_includes.rs index 1628f52c39..0377bbe9dc 100644 --- a/libdd-trace-protobuf/src/_includes.rs +++ b/libdd-trace-protobuf/src/_includes.rs @@ -4,6 +4,13 @@ // This file is @generated by prost-build. pub mod opentelemetry { pub mod proto { + pub mod collector { + pub mod trace { + pub mod v1 { + include!("opentelemetry.proto.collector.trace.v1.rs"); + } + } + } pub mod common { pub mod v1 { include!("opentelemetry.proto.common.v1.rs"); @@ -14,6 +21,11 @@ pub mod opentelemetry { include!("opentelemetry.proto.resource.v1.rs"); } } + pub mod trace { + pub mod v1 { + include!("opentelemetry.proto.trace.v1.rs"); + } + } } } pub mod pb { diff --git a/libdd-trace-protobuf/src/opentelemetry.proto.collector.trace.v1.rs b/libdd-trace-protobuf/src/opentelemetry.proto.collector.trace.v1.rs new file mode 100644 index 0000000000..fbd03366a9 --- /dev/null +++ b/libdd-trace-protobuf/src/opentelemetry.proto.collector.trace.v1.rs @@ -0,0 +1,23 @@ +// Copyright 2019, OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExportTraceServiceRequest { + #[prost(message, repeated, tag = "1")] + pub resource_spans: ::prost::alloc::vec::Vec< + super::super::super::trace::v1::ResourceSpans, + >, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ExportTraceServiceResponse { + #[prost(message, optional, tag = "1")] + pub partial_success: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ExportTracePartialSuccess { + #[prost(int64, tag = "1")] + pub rejected_spans: i64, + #[prost(string, tag = "2")] + pub error_message: ::prost::alloc::string::String, +} diff --git a/libdd-trace-protobuf/src/opentelemetry.proto.trace.v1.rs b/libdd-trace-protobuf/src/opentelemetry.proto.trace.v1.rs new file mode 100644 index 0000000000..d1d035f759 --- /dev/null +++ b/libdd-trace-protobuf/src/opentelemetry.proto.trace.v1.rs @@ -0,0 +1,224 @@ +// Copyright 2019, OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TracesData { + #[prost(message, repeated, tag = "1")] + pub resource_spans: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceSpans { + #[prost(message, optional, tag = "1")] + pub resource: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub scope_spans: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScopeSpans { + #[prost(message, optional, tag = "1")] + pub scope: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub spans: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Span { + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub span_id: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub trace_state: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "4")] + pub parent_span_id: ::prost::alloc::vec::Vec, + #[prost(fixed32, tag = "16")] + pub flags: u32, + #[prost(string, tag = "5")] + pub name: ::prost::alloc::string::String, + #[prost(enumeration = "span::SpanKind", tag = "6")] + pub kind: i32, + #[prost(fixed64, tag = "7")] + pub start_time_unix_nano: u64, + #[prost(fixed64, tag = "8")] + pub end_time_unix_nano: u64, + #[prost(message, repeated, tag = "9")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "10")] + pub dropped_attributes_count: u32, + #[prost(message, repeated, tag = "11")] + pub events: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "12")] + pub dropped_events_count: u32, + #[prost(message, repeated, tag = "13")] + pub links: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "14")] + pub dropped_links_count: u32, + #[prost(message, optional, tag = "15")] + pub status: ::core::option::Option, +} +/// Nested message and enum types in `Span`. +pub mod span { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Event { + #[prost(fixed64, tag = "1")] + pub time_unix_nano: u64, + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec< + super::super::super::common::v1::KeyValue, + >, + #[prost(uint32, tag = "4")] + pub dropped_attributes_count: u32, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Link { + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub span_id: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub trace_state: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "4")] + pub attributes: ::prost::alloc::vec::Vec< + super::super::super::common::v1::KeyValue, + >, + #[prost(uint32, tag = "5")] + pub dropped_attributes_count: u32, + #[prost(fixed32, tag = "6")] + pub flags: u32, + } + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum SpanKind { + Unspecified = 0, + Internal = 1, + Server = 2, + Client = 3, + Producer = 4, + Consumer = 5, + } + impl SpanKind { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "SPAN_KIND_UNSPECIFIED", + Self::Internal => "SPAN_KIND_INTERNAL", + Self::Server => "SPAN_KIND_SERVER", + Self::Client => "SPAN_KIND_CLIENT", + Self::Producer => "SPAN_KIND_PRODUCER", + Self::Consumer => "SPAN_KIND_CONSUMER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SPAN_KIND_UNSPECIFIED" => Some(Self::Unspecified), + "SPAN_KIND_INTERNAL" => Some(Self::Internal), + "SPAN_KIND_SERVER" => Some(Self::Server), + "SPAN_KIND_CLIENT" => Some(Self::Client), + "SPAN_KIND_PRODUCER" => Some(Self::Producer), + "SPAN_KIND_CONSUMER" => Some(Self::Consumer), + _ => None, + } + } + } +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct Status { + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + #[prost(enumeration = "status::StatusCode", tag = "3")] + pub code: i32, +} +/// Nested message and enum types in `Status`. +pub mod status { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum StatusCode { + Unset = 0, + Ok = 1, + Error = 2, + } + impl StatusCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unset => "STATUS_CODE_UNSET", + Self::Ok => "STATUS_CODE_OK", + Self::Error => "STATUS_CODE_ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "STATUS_CODE_UNSET" => Some(Self::Unset), + "STATUS_CODE_OK" => Some(Self::Ok), + "STATUS_CODE_ERROR" => Some(Self::Error), + _ => None, + } + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum SpanFlags { + DoNotUse = 0, + TraceFlagsMask = 255, + ContextHasIsRemoteMask = 256, + ContextIsRemoteMask = 512, +} +impl SpanFlags { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::DoNotUse => "SPAN_FLAGS_DO_NOT_USE", + Self::TraceFlagsMask => "SPAN_FLAGS_TRACE_FLAGS_MASK", + Self::ContextHasIsRemoteMask => "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK", + Self::ContextIsRemoteMask => "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SPAN_FLAGS_DO_NOT_USE" => Some(Self::DoNotUse), + "SPAN_FLAGS_TRACE_FLAGS_MASK" => Some(Self::TraceFlagsMask), + "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK" => Some(Self::ContextHasIsRemoteMask), + "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK" => Some(Self::ContextIsRemoteMask), + _ => None, + } + } +} diff --git a/libdd-trace-protobuf/src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto b/libdd-trace-protobuf/src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto new file mode 100644 index 0000000000..1e77256209 --- /dev/null +++ b/libdd-trace-protobuf/src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto @@ -0,0 +1,80 @@ +// This file was vendored from open-telemetry/opentelemetry-proto at commit +// 1e725b853bc8f6b46ee62e8232e4c83017b9536f. + +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.collector.trace.v1; + +import "opentelemetry/proto/trace/v1/trace.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Collector.Trace.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.collector.trace.v1"; +option java_outer_classname = "TraceServiceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/collector/trace/v1"; + +// Service that can be used to push spans between one Application instrumented with +// OpenTelemetry and a collector, or between a collector and a central collector (in this +// case spans are sent/received to/from multiple Applications). +service TraceService { + rpc Export(ExportTraceServiceRequest) returns (ExportTraceServiceResponse) {} +} + +message ExportTraceServiceRequest { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain one + // element. Intermediary nodes (such as OpenTelemetry Collector) that receive + // data from multiple origins typically batch the data before forwarding further and + // in that case this array will contain multiple elements. + repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1; +} + +message ExportTraceServiceResponse { + // The details of a partially successful export request. + // + // If the request is only partially accepted + // (i.e. when the server accepts only parts of the data and rejects the rest) + // the server MUST initialize the `partial_success` field and MUST + // set the `rejected_` with the number of items it rejected. + // + // Servers MAY also make use of the `partial_success` field to convey + // warnings/suggestions to senders even when the request was fully accepted. + // In such cases, the `rejected_` MUST have a value of `0` and + // the `error_message` MUST be non-empty. + // + // A `partial_success` message with an empty value (rejected_ = 0 and + // `error_message` = "") is equivalent to it not being set/present. Senders + // SHOULD interpret it the same way as in the full success case. + ExportTracePartialSuccess partial_success = 1; +} + +message ExportTracePartialSuccess { + // The number of rejected spans. + // + // A `rejected_` field holding a `0` value indicates that the + // request was fully accepted. + int64 rejected_spans = 1; + + // A developer-facing human-readable message in English. It should be used + // either to explain why the server rejected parts of the data during a partial + // success or to convey warnings/suggestions during a full success. The message + // should offer guidance on how users can address such issues. + // + // error_message is an optional field. An error_message with an empty value + // is equivalent to it not being set. + string error_message = 2; +} \ No newline at end of file diff --git a/libdd-trace-protobuf/src/pb/opentelemetry/proto/trace/v1/trace.proto b/libdd-trace-protobuf/src/pb/opentelemetry/proto/trace/v1/trace.proto new file mode 100644 index 0000000000..69564c256a --- /dev/null +++ b/libdd-trace-protobuf/src/pb/opentelemetry/proto/trace/v1/trace.proto @@ -0,0 +1,362 @@ +// This file was vendored from open-telemetry/opentelemetry-proto at commit +// 1e725b853bc8f6b46ee62e8232e4c83017b9536f. + +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.trace.v1; + +import "opentelemetry/proto/common/v1/common.proto"; +import "opentelemetry/proto/resource/v1/resource.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Trace.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.trace.v1"; +option java_outer_classname = "TraceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/trace/v1"; + +// TracesData represents the traces data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP traces data but do +// not implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message TracesData { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceSpans resource_spans = 1; +} + +// A collection of ScopeSpans from a Resource. +message ResourceSpans { + reserved 1000; + + // The resource for the spans in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeSpans that originate from a resource. + repeated ScopeSpans scope_spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the resource data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_spans" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationScope. +message ScopeSpans { + // The instrumentation scope information for the spans in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of Spans that originate from an instrumentation scope. + repeated Span spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the span data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "scope" field and all spans and span + // events in the "spans" field. + string schema_url = 3; +} + +// A Span represents a single operation performed by a single component of the system. +// +// The next available field id is 17. +message Span { + // A unique identifier for a trace. All spans from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes trace_id = 1; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes OR of length + // other than 8 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes span_id = 2; + + // trace_state conveys information about request position in multiple distributed tracing graphs. + // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header + // See also https://github.com/w3c/distributed-tracing for more details about this field. + string trace_state = 3; + + // The `span_id` of this span's parent span. If this is a root span, then this + // field must be empty. The ID is an 8-byte array. + bytes parent_span_id = 4; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether a span's parent + // is remote. The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the span is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // When creating span messages, if the message is logically forwarded from another source + // with an equivalent flags fields (i.e., usually another OTLP span message), the field SHOULD + // be copied as-is. If creating from a source that does not have an equivalent flags field + // (such as a runtime representation of an OpenTelemetry span), the high 22 bits MUST + // be set to zero. + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // + // [Optional]. + fixed32 flags = 16; + + // A description of the span's operation. + // + // For example, the name can be a qualified method name or a file name + // and a line number where the operation is called. A best practice is to use + // the same display name at the same call point in an application. + // This makes it easier to correlate spans in different traces. + // + // This field is semantically required to be set to non-empty string. + // Empty value is equivalent to an unknown span name. + // + // This field is required. + string name = 5; + + // SpanKind is the type of span. Can be used to specify additional relationships between spans + // in addition to a parent/child relationship. + enum SpanKind { + // Unspecified. Do NOT use as default. + // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + SPAN_KIND_UNSPECIFIED = 0; + + // Indicates that the span represents an internal operation within an application, + // as opposed to an operation happening at the boundaries. Default value. + SPAN_KIND_INTERNAL = 1; + + // Indicates that the span covers server-side handling of an RPC or other + // remote network request. + SPAN_KIND_SERVER = 2; + + // Indicates that the span describes a request to some remote service. + SPAN_KIND_CLIENT = 3; + + // Indicates that the span describes a producer sending a message to a broker. + // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + // between producer and consumer spans. A PRODUCER span ends when the message was accepted + // by the broker while the logical processing of the message might span a much longer time. + SPAN_KIND_PRODUCER = 4; + + // Indicates that the span describes consumer receiving a message from a broker. + // Like the PRODUCER kind, there is often no direct critical path latency relationship + // between producer and consumer spans. + SPAN_KIND_CONSUMER = 5; + } + + // Distinguishes between spans generated in a particular context. For example, + // two spans with the same name may be distinguished using `CLIENT` (caller) + // and `SERVER` (callee) to identify queueing latency associated with the span. + SpanKind kind = 6; + + // The start time of the span. On the client side, this is the time + // kept by the local machine where the span execution starts. On the server side, this + // is the time when the server's application handler starts running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 start_time_unix_nano = 7; + + // The end time of the span. On the client side, this is the time + // kept by the local machine where the span execution ends. On the server side, this + // is the time when the server application handler stops running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 end_time_unix_nano = 8; + + // A collection of key/value pairs. Note, global attributes + // like server name can be set using the resource API. Examples of attributes: + // + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "example.com/myattribute": true + // "example.com/score": 10.239 + // + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // The number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 10; + + // Event is a time-stamped annotation of the span, consisting of user-supplied + // text description and key-value pairs. + message Event { + // The time the event occurred. + fixed64 time_unix_nano = 1; + + // The name of the event. + // This field is semantically required to be set to non-empty string. + string name = 2; + + // A collection of attribute key/value pairs on the event. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 3; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 4; + } + + // A collection of Event items. + repeated Event events = 11; + + // The number of dropped events. If the value is 0, then no + // events were dropped. + uint32 dropped_events_count = 12; + + // A pointer from the current span to another span in the same trace or in a + // different trace. For example, this can be used in batching operations, + // where a single batch handler processes multiple requests from different + // traces or when the handler receives a request from a different project. + message Link { + // A unique identifier of a trace that this linked span is part of. The ID is a + // 16-byte array. + bytes trace_id = 1; + + // A unique identifier for the linked span. The ID is an 8-byte array. + bytes span_id = 2; + + // The trace_state associated with the link. + string trace_state = 3; + + // A collection of attribute key/value pairs on the link. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 5; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether the link is remote. + // The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the link is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // When creating new spans, bits 10-31 (most-significant 22-bits) MUST be zero. + // + // [Optional]. + fixed32 flags = 6; + } + + // A collection of Links, which are references from this span to a span + // in the same or different trace. + repeated Link links = 13; + + // The number of dropped links after the maximum size was + // enforced. If this value is 0, then no links were dropped. + uint32 dropped_links_count = 14; + + // An optional final status for this span. Semantically when Status isn't set, it means + // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + Status status = 15; +} + +// The Status type defines a logical error model that is suitable for different +// programming environments, including REST APIs and RPC APIs. +message Status { + reserved 1; + + // A developer-facing human readable error message. + string message = 2; + + // For the semantics of status codes see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status + enum StatusCode { + // The default status. + STATUS_CODE_UNSET = 0; + // The Span has been validated by an Application developer or Operator to + // have completed successfully. + STATUS_CODE_OK = 1; + // The Span contains an error. + STATUS_CODE_ERROR = 2; + }; + + // The status code. + StatusCode code = 3; +} + +// SpanFlags represents constants used to interpret the +// Span.flags field, which is protobuf 'fixed32' type and is to +// be used as bit-fields. Each non-zero value defined in this enum is +// a bit-mask. To extract the bit-field, for example, use an +// expression like: +// +// (span.flags & SPAN_FLAGS_TRACE_FLAGS_MASK) +// +// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. +// +// Note that Span flags were introduced in version 1.1 of the +// OpenTelemetry protocol. Older Span producers do not set this +// field, consequently consumers should not rely on the absence of a +// particular flag bit to indicate the presence of a particular feature. +enum SpanFlags { + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + SPAN_FLAGS_DO_NOT_USE = 0; + + // Bits 0-7 are used for trace flags. + SPAN_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; + + // Bits 8 and 9 are used to indicate that the parent span or link span is remote. + // Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known. + // Bit 9 (`IS_REMOTE`) indicates whether the span or link is remote. + SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK = 0x00000100; + SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK = 0x00000200; + + // Bits 10-31 are reserved for future use. +} \ No newline at end of file diff --git a/libdd-trace-utils/Cargo.toml b/libdd-trace-utils/Cargo.toml index 1d9fa07fe7..01b8a6f744 100644 --- a/libdd-trace-utils/Cargo.toml +++ b/libdd-trace-utils/Cargo.toml @@ -70,6 +70,7 @@ libdd-common = { path = "../libdd-common", default-features = false, features = bolero = "0.13" criterion = "0.5.1" httpmock = { version = "0.8.0-alpha.1" } +hex = "0.4" serde_json = "1.0" tokio = { version = "1", features = ["macros", "rt-multi-thread", "test-util"] } libdd-trace-utils = { path = ".", features = ["test-utils"] } diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs index 782a10e10d..6c72f03493 100644 --- a/libdd-trace-utils/src/otlp_encoder/mod.rs +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -5,9 +5,76 @@ pub mod json_types; pub mod mapper; +pub mod proto_convert; +pub use json_types::ExportTraceServiceRequest; pub use mapper::map_traces_to_otlp; +use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoExportTraceServiceRequest; +use prost::Message; + +/// Serialize an OTLP request to the HTTP/JSON wire format. +pub fn encode_otlp_json(req: &ExportTraceServiceRequest) -> serde_json::Result> { + serde_json::to_vec(req) +} + +/// Serialize an OTLP request to the HTTP/protobuf wire format. +pub fn encode_otlp_protobuf(req: &ExportTraceServiceRequest) -> Vec { + let proto: ProtoExportTraceServiceRequest = req.into(); + proto.encode_to_vec() +} + +#[cfg(test)] +mod encode_tests { + use super::*; + use crate::span::v04::Span; + use crate::span::BytesData; + use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoReq; + use prost::Message; + + fn sample() -> ExportTraceServiceRequest { + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let span: Span = Span { + trace_id: 0xD269B633813FC60C_u128, + span_id: 0xEEE19B7EC3C1B174, + name: libdd_tinybytes::BytesString::from_static("op"), + resource: libdd_tinybytes::BytesString::from_static("res"), + start: 1, + duration: 2, + ..Default::default() + }; + map_traces_to_otlp(vec![vec![span]], &resource_info) + } + + #[test] + fn json_and_protobuf_carry_same_span() { + let req = sample(); + let json = encode_otlp_json(&req).unwrap(); + let pb = encode_otlp_protobuf(&req); + + let json_v: serde_json::Value = serde_json::from_slice(&json).unwrap(); + let json_name = json_v["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["name"] + .as_str() + .unwrap() + .to_string(); + + let proto = ProtoReq::decode(pb.as_slice()).unwrap(); + let proto_name = proto.resource_spans[0].scope_spans[0].spans[0].name.clone(); + + assert_eq!(json_name, "res"); + assert_eq!(proto_name, "res"); + let json_sid = json_v["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["spanId"] + .as_str() + .unwrap() + .to_string(); + let proto_sid = &proto.resource_spans[0].scope_spans[0].spans[0].span_id; + assert_eq!(json_sid, hex::encode(proto_sid)); + } +} + /// Tracer-level attributes used to populate the OTLP Resource on export. /// /// These are the fields from the tracer's configuration that map to OTLP Resource attributes diff --git a/libdd-trace-utils/src/otlp_encoder/proto_convert.rs b/libdd-trace-utils/src/otlp_encoder/proto_convert.rs new file mode 100644 index 0000000000..a21a88428a --- /dev/null +++ b/libdd-trace-utils/src/otlp_encoder/proto_convert.rs @@ -0,0 +1,403 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Converts the hand-rolled serde OTLP request (the JSON wire model) into the generated +//! prost types for binary (HTTP/protobuf) export. The semantic DD-span -> OTLP mapping already +//! happened in `mapper.rs`; this is a purely structural translation. + +use crate::otlp_encoder::json_types as j; +use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoReq; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + any_value::Value as ProtoValue, AnyValue as ProtoAnyValue, ArrayValue as ProtoArrayValue, + InstrumentationScope as ProtoScope, KeyValue as ProtoKeyValue, +}; +use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource as ProtoResource; +use libdd_trace_protobuf::opentelemetry::proto::trace::v1::{ + span::{Event as ProtoEvent, Link as ProtoLink}, + status::StatusCode as ProtoStatusCode, + ResourceSpans as ProtoResourceSpans, ScopeSpans as ProtoScopeSpans, Span as ProtoSpan, + Status as ProtoStatus, +}; + +/// Decode a fixed-width lowercase hex string into a byte vector. The mapper always produces +/// well-formed hex of the expected width; on a malformed value we fall back to an all-zero +/// buffer of `len` bytes rather than panicking (FFI reliability). +fn hex_to_bytes(s: &str, len: usize) -> Vec { + let bytes = s.as_bytes(); + if bytes.len() != len * 2 { + return vec![0u8; len]; + } + let mut out = Vec::with_capacity(len); + let mut i = 0; + while i < bytes.len() { + match (hex_nibble(bytes[i]), hex_nibble(bytes[i + 1])) { + (Some(hi), Some(lo)) => out.push((hi << 4) | lo), + _ => return vec![0u8; len], + } + i += 2; + } + out +} + +fn hex_nibble(b: u8) -> Option { + match b { + b'0'..=b'9' => Some(b - b'0'), + b'a'..=b'f' => Some(b - b'a' + 10), + b'A'..=b'F' => Some(b - b'A' + 10), + _ => None, + } +} + +/// Parse a decimal timestamp string into `u64`. `mapper.rs` always emits these from `u64`/`i64` +/// fields via `format!`, so a parse failure can only mean a mapper bug; we fall back to 0 rather +/// than panicking (FFI reliability), matching the zero-fallback policy of `hex_to_bytes`. +fn parse_u64(s: &str) -> u64 { + s.parse().unwrap_or(0) +} + +impl From<&j::AnyValue> for ProtoAnyValue { + fn from(v: &j::AnyValue) -> Self { + let value = match v { + j::AnyValue::StringValue(s) => ProtoValue::StringValue(s.clone()), + j::AnyValue::BoolValue(b) => ProtoValue::BoolValue(*b), + j::AnyValue::IntValue(i) => ProtoValue::IntValue(*i), + j::AnyValue::DoubleValue(d) => ProtoValue::DoubleValue(*d), + j::AnyValue::BytesValue(b) => ProtoValue::BytesValue(b.clone()), + j::AnyValue::ArrayValue(a) => ProtoValue::ArrayValue(ProtoArrayValue { + values: a.values.iter().map(ProtoAnyValue::from).collect(), + }), + }; + ProtoAnyValue { value: Some(value) } + } +} + +fn kv(k: &j::KeyValue) -> ProtoKeyValue { + ProtoKeyValue { + key: k.key.clone(), + value: Some(ProtoAnyValue::from(&k.value)), + // `key_ref` and `entity_refs` (on Resource) are profiling-signal-only proto fields, + // unused for traces. Set explicitly to their zero defaults so the converter fails to + // compile if the proto shape changes (rather than silently misusing + // `..Default::default()`). + key_ref: 0, + } +} + +impl From<&j::ExportTraceServiceRequest> for ProtoReq { + fn from(req: &j::ExportTraceServiceRequest) -> Self { + ProtoReq { + resource_spans: req.resource_spans.iter().map(resource_spans).collect(), + } + } +} + +fn resource_spans(rs: &j::ResourceSpans) -> ProtoResourceSpans { + ProtoResourceSpans { + resource: rs.resource.as_ref().map(|r| ProtoResource { + attributes: r.attributes.iter().map(kv).collect(), + dropped_attributes_count: 0, + // `entity_refs` is a profiling-signal-only proto field, unused for traces. + // Explicit default (see `key_ref` note in `kv()`). + entity_refs: Vec::new(), + }), + scope_spans: rs.scope_spans.iter().map(scope_spans).collect(), + schema_url: String::new(), + } +} + +fn scope_spans(ss: &j::ScopeSpans) -> ProtoScopeSpans { + ProtoScopeSpans { + scope: ss.scope.as_ref().map(|s| ProtoScope { + name: s.name.clone().unwrap_or_default(), + version: s.version.clone().unwrap_or_default(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + spans: ss.spans.iter().map(span).collect(), + schema_url: ss.schema_url.clone().unwrap_or_default(), + } +} + +fn span(s: &j::OtlpSpan) -> ProtoSpan { + ProtoSpan { + trace_id: hex_to_bytes(&s.trace_id, 16), + span_id: hex_to_bytes(&s.span_id, 8), + trace_state: s.trace_state.clone().unwrap_or_default(), + parent_span_id: s + .parent_span_id + .as_ref() + .map(|p| hex_to_bytes(p, 8)) + .unwrap_or_default(), + flags: s.flags.unwrap_or(0), + name: s.name.clone(), + // `kind` is a prost open enum (stored as i32); the mapper produces valid SpanKind values, + // and unknown values are passed through unchanged per OTLP open-enum semantics. + kind: s.kind, + start_time_unix_nano: parse_u64(&s.start_time_unix_nano), + end_time_unix_nano: parse_u64(&s.end_time_unix_nano), + attributes: s.attributes.iter().map(kv).collect(), + dropped_attributes_count: s.dropped_attributes_count.unwrap_or(0), + events: s.events.iter().map(event).collect(), + dropped_events_count: s.dropped_events_count.unwrap_or(0), + links: s.links.iter().map(link).collect(), + // The serde `OtlpSpan` model does not track dropped links (the mapper enforces no + // link cap), so 0 is always correct here. + dropped_links_count: 0, + status: Some(ProtoStatus { + message: s.status.message.clone().unwrap_or_default(), + code: status_code(s.status.code), + }), + } +} + +/// Map a serde status-code integer to its prost counterpart. +/// +/// The serde (`json_types::status_code`) and prost (`ProtoStatusCode`) numeric values are +/// intentionally identical — UNSET=0, OK=1, ERROR=2 — so each arm is a no-op in practice. +/// The explicit match is kept as a correctness guard: the `_` arm deliberately clamps any +/// unrecognized value (e.g. a future proto extension not yet reflected in the serde model) +/// to `Unset` rather than forwarding an out-of-range integer to the wire. +fn status_code(code: i32) -> i32 { + match code { + c if c == j::status_code::OK => ProtoStatusCode::Ok as i32, + c if c == j::status_code::ERROR => ProtoStatusCode::Error as i32, + _ => ProtoStatusCode::Unset as i32, + } +} + +fn link(l: &j::OtlpSpanLink) -> ProtoLink { + ProtoLink { + trace_id: hex_to_bytes(&l.trace_id, 16), + span_id: hex_to_bytes(&l.span_id, 8), + trace_state: l.trace_state.clone().unwrap_or_default(), + attributes: l.attributes.iter().map(kv).collect(), + dropped_attributes_count: l.dropped_attributes_count.unwrap_or(0), + // `json_types::OtlpSpanLink` has no `flags` field, so 0 is the faithful value. + flags: 0, + } +} + +fn event(e: &j::OtlpSpanEvent) -> ProtoEvent { + ProtoEvent { + time_unix_nano: parse_u64(&e.time_unix_nano), + name: e.name.clone(), + attributes: e.attributes.iter().map(kv).collect(), + dropped_attributes_count: e.dropped_attributes_count.unwrap_or(0), + } +} + +#[cfg(test)] +mod tests { + use super::hex_to_bytes; + use crate::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; + use crate::span::v04::Span; + use crate::span::BytesData; + use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoReq; + use libdd_trace_protobuf::opentelemetry::proto::trace::v1::status::StatusCode as ProtoStatusCode; + + #[test] + fn converts_ids_and_attributes_to_proto() { + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let mut span: Span = Span { + trace_id: 0xD269B633813FC60C_u128, + span_id: 0xEEE19B7EC3C1B174, + parent_id: 0xEEE19B7EC3C1B173, + name: libdd_tinybytes::BytesString::from_static("op"), + resource: libdd_tinybytes::BytesString::from_static("res"), + r#type: libdd_tinybytes::BytesString::from_static("web"), + start: 1544712660000000000, + duration: 1000000000, + error: 0, + ..Default::default() + }; + span.metrics + .insert(libdd_tinybytes::BytesString::from_static("count"), 42.0); + + let serde_req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let proto: ProtoReq = (&serde_req).into(); + + let rs = &proto.resource_spans[0]; + let sp = &rs.scope_spans[0].spans[0]; + assert_eq!( + sp.trace_id, + vec![0, 0, 0, 0, 0, 0, 0, 0, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0x0C] + ); + assert_eq!( + sp.span_id, + vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x74] + ); + assert_eq!( + sp.parent_span_id, + vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73] + ); + assert_eq!(sp.name, "res"); + assert_eq!(sp.start_time_unix_nano, 1544712660000000000); + assert_eq!(sp.end_time_unix_nano, 1544712661000000000); + let count = sp + .attributes + .iter() + .find(|kv| kv.key == "count") + .expect("count attr"); + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value; + assert!(matches!( + count.value.as_ref().unwrap().value, + Some(Value::IntValue(42)) + )); + } + + // --- hex_to_bytes fallback tests --- + + #[test] + fn hex_to_bytes_wrong_length_returns_zeros() { + // "abc" is 3 chars but we expect 2 bytes (4 chars); should fall back to all-zero. + assert_eq!(hex_to_bytes("abc", 2), vec![0u8; 2]); + } + + #[test] + fn hex_to_bytes_bad_nibble_returns_zeros() { + // "zz" is the right length for 1 byte but contains invalid hex chars. + assert_eq!(hex_to_bytes("zz", 1), vec![0u8; 1]); + } + + // --- Status code + double metric test --- + + #[test] + fn error_span_produces_error_status_and_double_metric() { + // mapper.rs sets status.code = status_code::ERROR when span.error != 0, so + // proto_convert's status_code() must return ProtoStatusCode::Error as i32. + let resource_info = OtlpResourceInfo { + service: "svc-error-test".to_string(), + ..Default::default() + }; + let mut span: Span = Span { + trace_id: 0x1_u128, + span_id: 0x2, + name: libdd_tinybytes::BytesString::from_static("op"), + resource: libdd_tinybytes::BytesString::from_static("res"), + r#type: libdd_tinybytes::BytesString::from_static("web"), + start: 1_000_000_000, + duration: 500_000, + error: 1, // triggers ERROR status in mapper + ..Default::default() + }; + span.metrics + .insert(libdd_tinybytes::BytesString::from_static("ratio"), 1.5_f64); + + let serde_req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let proto: ProtoReq = (&serde_req).into(); + + let sp = &proto.resource_spans[0].scope_spans[0].spans[0]; + + // (a) status code must be ERROR + assert_eq!( + sp.status.as_ref().unwrap().code, + ProtoStatusCode::Error as i32 + ); + + // (b) the "ratio" metric must arrive as a DoubleValue + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value; + let ratio_attr = sp + .attributes + .iter() + .find(|kv| kv.key == "ratio") + .expect("ratio attr must be present"); + assert!( + matches!( + ratio_attr.value.as_ref().unwrap().value, + Some(Value::DoubleValue(v)) if (v - 1.5).abs() < f64::EPSILON + ), + "expected DoubleValue(1.5), got {:?}", + ratio_attr.value + ); + } + + #[test] + fn converts_links_and_events_to_proto() { + use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, SpanEvent, SpanLink}; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value; + use std::collections::HashMap; + + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + // A link carries its own 128-bit trace ID (high<<64 | low) and 64-bit span ID, decoded by + // `link()` via a separate `hex_to_bytes` call than the top-level span IDs. + let span: Span = Span { + trace_id: 0x1_u128, + span_id: 0x2, + name: libdd_tinybytes::BytesString::from_static("op"), + resource: libdd_tinybytes::BytesString::from_static("res"), + r#type: libdd_tinybytes::BytesString::from_static("web"), + start: 1_000_000_000, + duration: 500_000, + span_links: vec![SpanLink { + trace_id: 0x1122334455667788, + trace_id_high: 0x99AABBCCDDEEFF00, + span_id: 0x0102030405060708, + attributes: HashMap::from([( + libdd_tinybytes::BytesString::from_static("link.attr"), + libdd_tinybytes::BytesString::from_static("lv"), + )]), + tracestate: libdd_tinybytes::BytesString::from_static("ts=1"), + flags: 0, + }], + span_events: vec![SpanEvent { + time_unix_nano: 1_700_000_000_000_000_000, + name: libdd_tinybytes::BytesString::from_static("ev"), + attributes: HashMap::from([( + libdd_tinybytes::BytesString::from_static("ev.attr"), + AttributeAnyValue::SingleValue(AttributeArrayValue::String( + libdd_tinybytes::BytesString::from_static("evv"), + )), + )]), + }], + ..Default::default() + }; + + let serde_req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let proto: ProtoReq = (&serde_req).into(); + let sp = &proto.resource_spans[0].scope_spans[0].spans[0]; + + // --- link --- + let link = &sp.links[0]; + assert_eq!( + link.trace_id, + vec![ + 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88 + ] + ); + assert_eq!( + link.span_id, + vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08] + ); + assert_eq!(link.trace_state, "ts=1"); + let link_attr = link + .attributes + .iter() + .find(|kv| kv.key == "link.attr") + .expect("link attr"); + assert!(matches!( + link_attr.value.as_ref().and_then(|v| v.value.as_ref()), + Some(Value::StringValue(s)) if s == "lv" + )); + + // --- event --- + let event = &sp.events[0]; + assert_eq!(event.time_unix_nano, 1_700_000_000_000_000_000); + assert_eq!(event.name, "ev"); + let event_attr = event + .attributes + .iter() + .find(|kv| kv.key == "ev.attr") + .expect("event attr"); + assert!(matches!( + event_attr.value.as_ref().and_then(|v| v.value.as_ref()), + Some(Value::StringValue(s)) if s == "evv" + )); + } +}