Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 35 additions & 117 deletions libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use libdd_telemetry::worker::{
};
use libdd_trace_utils::{
send_with_retry::{SendWithRetryError, SendWithRetryResult},
span::trace_utils::DroppedStats,
trace_utils::SendDataResult,
};
use std::{collections::HashMap, time::Duration};
Expand Down Expand Up @@ -165,8 +164,6 @@ pub struct SendPayloadTelemetry {
errors_status_code: u64,
bytes_sent: u64,
chunks_sent: u64,
chunks_dropped_p0: u64,
chunks_dropped_by_trace_filter: u64,
chunks_dropped_serialization_error: u64,
chunks_dropped_send_failure: u64,
responses_count_per_code: HashMap<u16, u64>,
Expand Down Expand Up @@ -195,18 +192,8 @@ impl SendPayloadTelemetry {
/// * `value` - The result of sending traces with retry
/// * `bytes_sent` - The number of bytes in the payload
/// * `chunks` - The number of trace chunks in the payload
/// * `dropped_stats` - Trace dropped stats from `stats::process_traces_for_stats`
pub fn from_retry_result(
value: &SendWithRetryResult,
bytes_sent: u64,
chunks: u64,
dropped_stats: DroppedStats,
) -> Self {
let mut telemetry = Self {
chunks_dropped_p0: dropped_stats.dropped_p0_traces as u64,
chunks_dropped_by_trace_filter: dropped_stats.dropped_by_trace_filter as u64,
..Default::default()
};
pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self {
let mut telemetry = Self::default();
match value {
Ok((response, attempts)) => {
telemetry.chunks_sent = chunks;
Expand Down Expand Up @@ -286,18 +273,6 @@ impl TelemetryClient {
self.worker
.add_point(data.chunks_sent as f64, key, vec![])?;
}
if data.chunks_dropped_p0 > 0 {
let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0);
self.worker
.add_point(data.chunks_dropped_p0 as f64, key, vec![])?;
}
if data.chunks_dropped_by_trace_filter > 0 {
let key = self
.metrics
.get(metrics::MetricKind::ChunksDroppedByTraceFilter);
self.worker
.add_point(data.chunks_dropped_by_trace_filter as f64, key, vec![])?;
}
if data.chunks_dropped_serialization_error > 0 {
let key = self
.metrics
Expand All @@ -322,6 +297,27 @@ impl TelemetryClient {
Ok(())
}

/// Send dropped P0 trace counts to telemetry.
pub fn send_client_side_stats_drops(
&self,
dropped_p0_traces: usize,
dropped_by_trace_filter: usize,
) -> Result<(), TelemetryError> {
if dropped_p0_traces > 0 {
let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0);
self.worker
.add_point(dropped_p0_traces as f64, key, vec![])?;
}
if dropped_by_trace_filter > 0 {
let key = self
.metrics
.get(metrics::MetricKind::ChunksDroppedByTraceFilter);
self.worker
.add_point(dropped_by_trace_filter as f64, key, vec![])?;
}
Ok(())
}

/// Starts the client
pub async fn start(&self) {
_ = self
Expand Down Expand Up @@ -645,23 +641,22 @@ mod tests {

#[cfg_attr(miri, ignore)]
#[test]
fn chunks_dropped_p0_test() {
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap();
fn send_client_side_stats_drops_test() {
let payload_p0 = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,3\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap();
let payload_trace_filter = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,5\.0\]\],"tags":\["src_library:libdatadog","reason:trace_filters"\],"common":true,"type":"count"#).unwrap();
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
let server = MockServer::start();
let mut telemetry_srv = server.mock(|when, then| {
when.method(POST).body_matches(payload);
when.method(POST)
.body_matches(payload_p0)
.body_matches(payload_trace_filter);
then.status(200).body("");
});
let data = SendPayloadTelemetry {
chunks_dropped_p0: 1,
..Default::default()
};
let (client, handle) = get_test_client(&server.url("/"), &shared_runtime);
shared_runtime
.block_on(async {
client.start().await;
let _ = client.send(&data);
client.send_client_side_stats_drops(3, 5).unwrap();
// Wait for send to be processed
sleep(Duration::from_millis(100)).await;

Expand Down Expand Up @@ -714,16 +709,7 @@ mod tests {
.unwrap(),
3,
));
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
4,
5,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -736,55 +722,14 @@ mod tests {
)
}

#[test]
fn telemetry_from_ok_response_with_p0_drops_test() {
let result = Ok((
http::Response::builder()
.status(http::StatusCode::OK)
.body(Bytes::new())
.unwrap(),
3,
));
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
4,
5,
DroppedStats {
dropped_p0_traces: 10,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
bytes_sent: 4,
chunks_sent: 5,
requests_count: 3,
chunks_dropped_p0: 10,
responses_count_per_code: HashMap::from([(200, 1)]),
..Default::default()
}
)
}

#[test]
fn telemetry_from_request_error_test() {
let error_response = http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body(Bytes::new())
.unwrap();
let result = Err(SendWithRetryError::Http(error_response, 5));
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -803,16 +748,7 @@ mod tests {
HttpError::Network(anyhow::anyhow!("connection refused")),
5,
));
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -827,16 +763,7 @@ mod tests {
#[test]
fn telemetry_from_timeout_error_test() {
let result = Err(SendWithRetryError::Timeout(5));
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -852,16 +779,7 @@ mod tests {
#[test]
fn telemetry_from_build_error_test() {
let result = Err(SendWithRetryError::Build(5));
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand Down
10 changes: 4 additions & 6 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod trace_serializer;

// Re-export the builder
pub use builder::TraceExporterBuilder;
use libdd_trace_utils::{span::trace_utils::DroppedStats, trace_filter::TraceFilterer};
use libdd_trace_utils::trace_filter::TraceFilterer;

use self::agent_response::AgentResponse;
use self::metrics::MetricsEmitter;
Expand Down Expand Up @@ -577,8 +577,6 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
mp_payload: Vec<u8>,
headers: HeaderMap,
chunks: usize,
#[cfg_attr(not(feature = "telemetry"), allow(unused_variables))]
dropped_stats: DroppedStats,
) -> Result<AgentResponse, TraceExporterError> {
let strategy = RetryStrategy::default();
let payload_len = mp_payload.len();
Expand All @@ -599,7 +597,6 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
&result,
payload_len as u64,
chunks as u64,
dropped_stats,
)) {
error!(?e, "Error sending telemetry");
}
Expand All @@ -616,12 +613,14 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra

// Process stats computation and drop non-sampled (p0) chunks.
// This must run before the OTLP path so that unsampled spans are not exported.
let dropped_stats = stats::process_traces_for_stats(
stats::process_traces_for_stats(
&mut traces,
&mut header_tags,
&self.client_side_stats.status,
self.client_computed_top_level,
&self.trace_filterer.load(),
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
self.telemetry.as_ref(),
);

for chunk in &mut traces {
Expand Down Expand Up @@ -674,7 +673,6 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
prepared.data,
prepared.headers,
prepared.chunk_count,
dropped_stats,
)
.await;

Expand Down
33 changes: 20 additions & 13 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,19 @@ fn add_spans_to_stats<T: libdd_trace_utils::span::TraceData>(
}

/// Process traces for stats computation and update header tags accordingly.
/// Returns the number of P0 traces and spans that were dropped.
///
/// If a telemetry client is provided and stats are enabled, dropped P0 counts
/// will be sent to telemetry.
pub(crate) fn process_traces_for_stats<T: libdd_trace_utils::span::TraceData>(
traces: &mut Vec<Vec<libdd_trace_utils::span::v04::Span<T>>>,
header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags,
client_side_stats: &ArcSwap<StatsComputationStatus>,
client_computed_top_level: bool,
trace_filterer: &TraceFilterer,
) -> libdd_trace_utils::span::trace_utils::DroppedStats {
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] telemetry: Option<
Comment thread
Eldolfin marked this conversation as resolved.
&crate::telemetry::TelemetryClient,
>,
) {
let status = client_side_stats.load();
if let StatsComputationStatus::Enabled {
stats_concentrator, ..
Expand All @@ -322,22 +327,24 @@ pub(crate) fn process_traces_for_stats<T: libdd_trace_utils::span::TraceData>(
add_spans_to_stats(stats_concentrator, traces);
// Once stats have been computed we can drop all chunks that are not going to be
// sampled by the agent
let mut dropped_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces);
dropped_stats.dropped_by_trace_filter = dropped_by_trace_filter;
let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces);

// Update the headers to indicate that stats have been computed and forward dropped
// traces counts
header_tags.client_computed_top_level = true;
header_tags.client_computed_stats = true;
header_tags.dropped_p0_traces = dropped_stats.dropped_p0_traces;
header_tags.dropped_p0_spans = dropped_stats.dropped_p0_spans;

dropped_stats
} else {
libdd_trace_utils::span::trace_utils::DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
header_tags.dropped_p0_traces = dropped_p0_stats.dropped_p0_traces;
header_tags.dropped_p0_spans = dropped_p0_stats.dropped_p0_spans;

// Send dropped P0 stats directly to telemetry if available
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
if let Some(telemetry_client) = telemetry {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to add a test asserting that stats are sent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a bit heavy. There's a test with http mock in the telemetry component, I don't think it should be up to the caller to check that telemetry correctly sends the telemetry.

if let Err(e) = telemetry_client.send_client_side_stats_drops(
dropped_p0_stats.dropped_p0_traces,
dropped_by_trace_filter,
) {
tracing::error!(?e, "Error sending dropped P0 stats to telemetry");
}
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions libdd-trace-utils/src/span/trace_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ pub fn is_partial_snapshot<T: TraceData>(span: &Span<T>) -> bool {
.is_some_and(|v| *v >= 0.0)
}

pub struct DroppedStats {
pub struct DroppedP0Stats {
pub dropped_p0_traces: usize,
pub dropped_p0_spans: usize,
pub dropped_by_trace_filter: usize,
}

// Keys used for sampling
Expand All @@ -146,7 +145,7 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr";
///
/// # Trace-level attributes
/// Some attributes related to the whole trace are stored in the root span of the chunk.
pub fn drop_chunks<T>(traces: &mut Vec<Vec<Span<T>>>) -> DroppedStats
pub fn drop_chunks<T>(traces: &mut Vec<Vec<Span<T>>>) -> DroppedP0Stats
where
T: TraceData,
{
Expand Down Expand Up @@ -197,10 +196,9 @@ where
true
});

DroppedStats {
DroppedP0Stats {
dropped_p0_traces,
dropped_p0_spans,
dropped_by_trace_filter: 0,
}
}

Expand Down
Loading