Skip to content

feat(dsm): Add support for automatic DSM context extraction inside the extension#1265

Open
jeastham1993 wants to merge 10 commits into
mainfrom
feat/extension-dsm-support
Open

feat(dsm): Add support for automatic DSM context extraction inside the extension#1265
jeastham1993 wants to merge 10 commits into
mainfrom
feat/extension-dsm-support

Conversation

@jeastham1993

@jeastham1993 jeastham1993 commented Jun 18, 2026

Copy link
Copy Markdown

Please include Jira ticket in title.

Overview

Update the trace context propagation support in universal instrumentation to include the automatic extraction of Data Streams Monitoring context for SQS, SNS, Kinesis and EventBridge.

Testing

Added unit tests. Manually tested the functionality for Java, Go and .NET. Java and Go work correctly. The .NET tracer needs updating to support the 2.x version of Amazon.Lambda.RuntimeSupport so can't test that manually yet.

Copilot AI review requested due to automatic review settings June 18, 2026 07:32
@jeastham1993 jeastham1993 requested review from a team as code owners June 18, 2026 07:32
@jeastham1993 jeastham1993 requested a review from lym953 June 18, 2026 07:32
@datadog-official

datadog-official Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 4 Pipeline jobs failed

DataDog/datadog-lambda-extension | integration-suite: [auth]   View in Datadog   GitLab

DataDog/datadog-lambda-extension | integration-suite: [lmi]   View in Datadog   GitLab

DataDog/datadog-lambda-extension | integration-suite: [on-demand]   View in Datadog   GitLab

View all 4 failed jobs.

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 9b923a5 | Docs | Datadog PR Page | Give us feedback!

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Bottlecap’s universal instrumentation trace context propagation to automatically extract and emit Data Streams Monitoring (DSM) consume-side checkpoints within the extension for SQS, SNS, Kinesis, and EventBridge. It adds a DSM aggregation/serialization pipeline that drains into the existing proxy flush path and introduces configuration knobs to enable DSM consume extraction and provide an EventBridge exchange fallback.

Changes:

  • Add a new traces::data_streams module implementing DSM context decode, pathway hashing, checkpoint computation, aggregation, and msgpack+gzip payload generation.
  • Wire an optional DsmProcessor into invocation start processing (universal instrumentation) and into the flushing pipeline so pipeline-stats are shipped via the proxy flusher.
  • Add trigger-specific DSM edge tags for SQS/SNS/Kinesis/EventBridge and introduce DD_DSM_CONSUME_ENABLED / DD_DSM_EXCHANGE_NAME config support.

Reviewed changes

Copilot reviewed 25 out of 26 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
bottlecap/src/traces/mod.rs Exposes the new data_streams module from traces.
bottlecap/src/traces/data_streams/mod.rs DSM module entrypoint; re-exports key DSM types/functions.
bottlecap/src/traces/data_streams/context.rs Implements inbound DSM pathway context decoding (base64 + zigzag varints).
bottlecap/src/traces/data_streams/pathway.rs Implements dd-trace-js-compatible pathway hash computation.
bottlecap/src/traces/data_streams/propagation_hash.rs Implements optional FNV-1 propagation hash.
bottlecap/src/traces/data_streams/checkpoint.rs Computes consume-side checkpoints from extracted context + tags.
bottlecap/src/traces/data_streams/sketch.rs Implements tracer-compatible DDSketch + protobuf bytes serialization.
bottlecap/src/traces/data_streams/aggregator.rs Aggregates checkpoints into 10s buckets and serializes msgpack payloads.
bottlecap/src/traces/data_streams/processor.rs Bridges checkpoint aggregation to proxy flush by gzipping/enqueueing proxy requests.
bottlecap/src/traces/data_streams/fixtures/sketch_golden.json Golden vectors to validate DDSketch compatibility.
bottlecap/src/tags/lambda/tags.rs Updates hardcoded extension version string used in tags/logging.
bottlecap/src/proxy/interceptor.rs Enables universal instrumentation processing when experimental proxy env var is set.
bottlecap/src/lifecycle/invocation/triggers/mod.rs Adds Trigger::get_dsm_edge_tags() default hook.
bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs Adds SQS DSM consume edge tag derivation.
bottlecap/src/lifecycle/invocation/triggers/sns_event.rs Adds SNS DSM consume edge tag derivation.
bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs Adds Kinesis DSM consume edge tag derivation.
bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs Adds EventBridge DSM consume tags + best-effort bus name extraction + tests.
bottlecap/src/lifecycle/invocation/processor.rs Hooks DSM consume recording into universal instrumentation start; adds EventBridge exchange fallback.
bottlecap/src/lifecycle/invocation/processor_service.rs Threads optional DsmProcessor into Processor initialization.
bottlecap/src/flushing/service.rs Drains DSM pipeline-stats into proxy aggregator immediately before proxy flush.
bottlecap/src/config/yaml.rs Adds YAML config fields for DSM consume enabling and exchange fallback.
bottlecap/src/config/mod.rs Adds Config fields: dsm_consume_enabled, dsm_exchange_name.
bottlecap/src/config/env.rs Adds env config parsing/merging for DD_DSM_CONSUME_ENABLED and DD_DSM_EXCHANGE_NAME.
bottlecap/src/bin/bottlecap/main.rs Constructs shared proxy aggregator, conditionally instantiates DSM processor, wires it into services.
bottlecap/Cargo.toml Promotes msgpack/gzip dependencies to main deps and adds serde_bytes.
bottlecap/Cargo.lock Adds serde_bytes to resolved dependency set.

Comment thread bottlecap/src/tags/lambda/tags.rs Outdated
Comment thread bottlecap/src/traces/data_streams/aggregator.rs
Comment thread bottlecap/src/traces/data_streams/processor.rs Outdated
Comment thread bottlecap/src/traces/data_streams/processor.rs
Comment thread bottlecap/src/traces/data_streams/context.rs
Comment thread bottlecap/src/lifecycle/invocation/processor.rs Outdated
Comment thread bottlecap/src/traces/mod.rs

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 579f6e6d9b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/tags/lambda/tags.rs Outdated
Comment thread bottlecap/src/traces/data_streams/processor.rs
Comment thread bottlecap/src/lifecycle/invocation/processor.rs Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 947380af7b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

let first = segments.next().unwrap_or_default();
// `rule/<bus>/<rule>` => bus is the first segment.
// `rule/<rule>` (default bus) => no second segment, skip.
if segments.next().is_some() && !first.is_empty() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Emit default EventBridge bus exchange

For EventBridge rules on the default bus, AWS rule ARNs are encoded as ...:rule/<rule> with no bus segment; this branch treats that as unknown and omits the exchange tag unless DD_DSM_EXCHANGE_NAME is configured. Default-bus EventBridge Lambda invokes are common, and dropping the bus identity makes the consume checkpoint hash/aggregate differently from the same edge tagged with its bus, breaking DSM continuity for those events. Please treat the one-segment rule ARN as exchange:default.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ca5b7da286

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/bin/bottlecap/main.rs

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9b923a55cf

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

// functions whose tracer does not drive the invocation lifecycle.
let experimental_proxy_enabled = std::env::var("DD_EXPERIMENTAL_ENABLE_PROXY")
.is_ok_and(|v| v.eq_ignore_ascii_case("true"));
if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some() || experimental_proxy_enabled {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drive DSM extraction when the proxy already has the payload

When the runtime API proxy is running for the Datadog wrapper because AppSec is enabled, and DD_DSM_CONSUME_ENABLED=true but no tracer calls /lambda/start-invocation, this condition stays false unless LWA or DD_EXPERIMENTAL_ENABLE_PROXY is also set. In that no-tracer/AppSec setup the event body is available here but lwa::process_invocation_next is skipped, so process_on_universal_instrumentation_start never records any DSM consume checkpoints; include the DSM flag, or all proxy-active wrapper cases, in this gate.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants