Skip to content
1 change: 1 addition & 0 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ regex = { version = "1.10", default-features = false }
reqwest = { version = "0.12.11", features = ["json", "http2"], default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde_bytes = { version = "0.11", default-features = false, features = ["std"] }
# DSM pipeline-stats serialization (msgpack + gzip) for extension-side checkpoints.
rmp-serde = { version = "1.3.1", default-features = false }
flate2 = { version = "1.1", default-features = false, features = ["rust_backend"] }
thiserror = { version = "1.0", default-features = false }
# Transitive dependency (pulled in via cookie). Pinned to >=0.3.47 so cargo audit / CI passes (RUSTSEC-2026-0009).
time = { version = "0.3.47", default-features = false }
Expand Down Expand Up @@ -94,9 +98,6 @@ tower = { version = "0.5", features = ["util"] }
mock_instant = "0.6"
serial_test = "3.1"
tempfile = "3.20"
# fake-intake test harness: decode msgpack+gzip stats payloads on arrival
rmp-serde = { version = "1.3.1", default-features = false }
flate2 = { version = "1.1", default-features = false, features = ["rust_backend"] }

[build-dependencies]
# No external dependencies needed for the build script
Expand Down
35 changes: 34 additions & 1 deletion bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,34 @@ async fn extension_loop_active(
.await;

let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(config)));

// Shared proxy aggregator (used by the trace agent's proxy endpoints and,
// when enabled, the extension-side DSM processor).
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));

// Extension-side Data Streams Monitoring (consume checkpoints), gated by
// DD_DSM_CONSUME_ENABLED.
let dsm_processor = if config.dsm_consume_enabled {
let service = config
.service
.clone()
.or_else(|| tags_provider.get_canonical_resource_name())
.unwrap_or_else(|| "aws.lambda".to_string())
Comment thread
jeastham1993 marked this conversation as resolved.
.to_lowercase();
let env = config.env.clone().unwrap_or_default();
Some(Arc::new(
bottlecap::traces::data_streams::DsmProcessor::new(
service,
env,
env!("CARGO_PKG_VERSION").to_string(),
&config.site,
Arc::clone(&proxy_aggregator),
),
))
} else {
None
};

// Lifecycle Invocation Processor
let (invocation_processor_handle, invocation_processor_service) =
InvocationProcessorService::new(
Expand All @@ -339,6 +367,7 @@ async fn extension_loop_active(
metrics_aggregator_handle.clone(),
Arc::clone(&propagator),
durable_context_tx,
dsm_processor.clone(),
);
tokio::spawn(async move {
invocation_processor_service.run().await;
Expand Down Expand Up @@ -372,6 +401,7 @@ async fn extension_loop_active(
invocation_processor_handle.clone(),
appsec_processor.clone(),
&shared_client,
Arc::clone(&proxy_aggregator),
);

let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy(
Expand Down Expand Up @@ -429,6 +459,7 @@ async fn extension_loop_active(
let stats_flusher_clone = Arc::clone(&stats_flusher);
let proxy_flusher_clone = proxy_flusher.clone();
let metrics_aggr_handle_clone = metrics_aggregator_handle.clone();
let dsm_processor_clone = dsm_processor.clone();

// In Managed Instance mode, create a separate interval for the background flusher task.
// We don't reuse race_flush_interval because we need to configure the missed tick
Expand Down Expand Up @@ -459,6 +490,7 @@ async fn extension_loop_active(
proxy_flusher_clone,
metrics_flushers_clone,
metrics_aggr_handle_clone,
dsm_processor_clone,
);

loop {
Expand Down Expand Up @@ -633,6 +665,7 @@ async fn extension_loop_active(
proxy_flusher.clone(),
Arc::clone(&metrics_flushers),
metrics_aggregator_handle.clone(),
dsm_processor.clone(),
);
handle_next_invocation(next_lambda_response, &invocation_processor_handle).await;
loop {
Expand Down Expand Up @@ -1103,6 +1136,7 @@ fn start_trace_agent(
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
client: &Client,
proxy_aggregator: Arc<TokioMutex<proxy_aggregator::Aggregator>>,
) -> (
Sender<SendDataBuilderInfo>,
Arc<trace_flusher::TraceFlusher>,
Expand Down Expand Up @@ -1167,7 +1201,6 @@ fn start_trace_agent(
tokio::spawn(span_dedup_service.run());

// Proxy
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));
let proxy_flusher = Arc::new(ProxyFlusher::new(
api_key_factory.clone(),
Arc::clone(&proxy_aggregator),
Expand Down
17 changes: 17 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,17 @@ pub struct EnvConfig {
/// Enable the new AWS-resource naming logic in the tracer.
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub trace_aws_service_representation_enabled: Option<bool>,
/// @env `DD_DSM_CONSUME_ENABLED`
///
/// Enable extension-side Data Streams Monitoring consume checkpoints. When
/// enabled, the extension extracts inbound DSM pathway context from event
/// payloads and emits `direction:in` checkpoints itself.
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub dsm_consume_enabled: Option<bool>,
/// @env `DD_DSM_EXCHANGE_NAME`
pub dsm_exchange_name: Option<String>,
/// @env `DD_DSM_KAFKA_GROUP`
pub dsm_kafka_group: Option<String>,
//
// Trace Propagation
/// @env `DD_TRACE_PROPAGATION_STYLE`
Expand Down Expand Up @@ -582,6 +593,9 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option!(config, env_config, apm_filter_tags_regex_require);
merge_option!(config, env_config, apm_filter_tags_regex_reject);
merge_option_to_value!(config, env_config, trace_aws_service_representation_enabled);
merge_option_to_value!(config, env_config, dsm_consume_enabled);
merge_option!(config, env_config, dsm_exchange_name);
merge_option!(config, env_config, dsm_kafka_group);

// Trace Propagation
merge_vec!(config, env_config, trace_propagation_style);
Expand Down Expand Up @@ -1027,6 +1041,9 @@ mod tests {
trace_propagation_extract_first: true,
trace_propagation_http_baggage_enabled: true,
trace_aws_service_representation_enabled: true,
dsm_consume_enabled: false,
dsm_exchange_name: None,
dsm_kafka_group: None,
metrics_config_compression_level: 3,
otlp_config_traces_enabled: false,
otlp_config_traces_span_name_as_resource_name: true,
Expand Down
14 changes: 14 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,17 @@ pub struct Config {
pub trace_propagation_http_baggage_enabled: bool,
pub trace_aws_service_representation_enabled: bool,

// Data Streams Monitoring
/// Enable extension-side DSM consume checkpoints (`DD_DSM_CONSUME_ENABLED`).
pub dsm_consume_enabled: bool,
/// Fallback DSM `exchange` (event bus name) used for `EventBridge` consume
/// checkpoints when it cannot be derived from the event payload
/// (`DD_DSM_EXCHANGE_NAME`).
pub dsm_exchange_name: Option<String>,
/// Consumer group used for `MSK`/Kafka DSM consume checkpoints, which is not
/// present in the Lambda event payload (`DD_DSM_KAFKA_GROUP`).
pub dsm_kafka_group: Option<String>,

// Metrics
pub metrics_config_compression_level: i32,
pub statsd_metric_namespace: Option<String>,
Expand Down Expand Up @@ -431,6 +442,9 @@ impl Default for Config {
apm_filter_tags_regex_require: None,
apm_filter_tags_regex_reject: None,
trace_aws_service_representation_enabled: true,
dsm_consume_enabled: false,
dsm_exchange_name: None,
dsm_kafka_group: None,
trace_propagation_style: vec![
TracePropagationStyle::Datadog,
TracePropagationStyle::TraceContext,
Expand Down
10 changes: 10 additions & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ pub struct YamlConfig {
pub service_mapping: HashMap<String, String>,
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub trace_aws_service_representation_enabled: Option<bool>,
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub dsm_consume_enabled: Option<bool>,
pub dsm_exchange_name: Option<String>,
pub dsm_kafka_group: Option<String>,
// Trace Propagation
#[serde(deserialize_with = "deserialize_trace_propagation_style")]
pub trace_propagation_style: Vec<TracePropagationStyle>,
Expand Down Expand Up @@ -553,6 +557,9 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) {
yaml_config,
trace_aws_service_representation_enabled
);
merge_option_to_value!(config, yaml_config, dsm_consume_enabled);
merge_option!(config, yaml_config, dsm_exchange_name);
merge_option!(config, yaml_config, dsm_kafka_group);

// OTLP
if let Some(otlp_config) = &yaml_config.otlp_config {
Expand Down Expand Up @@ -975,6 +982,9 @@ api_security_sample_delay: 60 # Seconds
trace_propagation_extract_first: true,
trace_propagation_http_baggage_enabled: true,
trace_aws_service_representation_enabled: true,
dsm_consume_enabled: false,
dsm_exchange_name: None,
dsm_kafka_group: None,
metrics_config_compression_level: 3,
otlp_config_traces_enabled: false,
otlp_config_traces_span_name_as_resource_name: true,
Expand Down
17 changes: 17 additions & 0 deletions bottlecap/src/flushing/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub struct FlushingService {
proxy_flusher: Arc<ProxyFlusher>,
metrics_flushers: Arc<Vec<MetricsFlusher>>,

/// Optional extension-side DSM processor. When present, its aggregated
/// pipeline-stats payload is drained into the proxy aggregator immediately
/// before each proxy flush. `None` unless `DD_DSM_CONSUME_ENABLED` is set.
dsm_processor: Option<Arc<crate::traces::data_streams::DsmProcessor>>,

// Metrics aggregator handle for getting data to flush
metrics_aggr_handle: MetricsAggregatorHandle,

Expand All @@ -46,13 +51,15 @@ impl FlushingService {
proxy_flusher: Arc<ProxyFlusher>,
metrics_flushers: Arc<Vec<MetricsFlusher>>,
metrics_aggr_handle: MetricsAggregatorHandle,
dsm_processor: Option<Arc<crate::traces::data_streams::DsmProcessor>>,
) -> Self {
Self {
logs_flusher,
trace_flusher,
stats_flusher,
proxy_flusher,
metrics_flushers,
dsm_processor,
metrics_aggr_handle,
handles: FlushHandles::new(),
}
Expand Down Expand Up @@ -123,6 +130,11 @@ impl FlushingService {
sf.flush(false, None).await.unwrap_or_default()
}));

// Drain DSM pipeline stats into the proxy aggregator before flushing.
if let Some(dsm) = &self.dsm_processor {
dsm.drain_into_proxy().await;
}

// Spawn proxy flush
let pf = self.proxy_flusher.clone();
self.handles
Expand Down Expand Up @@ -324,6 +336,11 @@ impl FlushingService {
})
.collect();

// Drain DSM pipeline stats into the proxy aggregator before flushing.
if let Some(dsm) = &self.dsm_processor {
dsm.drain_into_proxy().await;
}

tokio::join!(
self.logs_flusher.flush(None),
futures::future::join_all(metrics_futures),
Expand Down
Loading
Loading