From 813e3577f677b3ed50bf340cfeca0068e960d4ae Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Mon, 15 Jun 2026 21:04:30 +0200 Subject: [PATCH 1/4] Extract one_way_shared_memory to IPC and prepare libdd-remote-config for python Signed-off-by: Bob Weinand --- Cargo.lock | 1 + datadog-ipc/Cargo.toml | 3 + datadog-ipc/src/lib.rs | 1 + datadog-ipc/src/one_way_shared_memory.rs | 448 ++++++++++++++++++ datadog-sidecar/src/agent_remote_config.rs | 25 +- datadog-sidecar/src/lib.rs | 1 - datadog-sidecar/src/one_way_shared_memory.rs | 245 ---------- datadog-sidecar/src/service/agent_info.rs | 8 +- datadog-sidecar/src/service/telemetry.rs | 2 +- datadog-sidecar/src/shm_remote_config.rs | 20 +- libdd-remote-config/Cargo.toml | 3 + libdd-remote-config/src/fetch/fetcher.rs | 4 +- libdd-remote-config/src/fetch/single.rs | 21 + .../src/file_change_tracker.rs | 56 ++- libdd-remote-config/src/lib.rs | 4 + libdd-remote-config/src/path.rs | 12 + 16 files changed, 568 insertions(+), 286 deletions(-) create mode 100644 datadog-ipc/src/one_way_shared_memory.rs delete mode 100644 datadog-sidecar/src/one_way_shared_memory.rs diff --git a/Cargo.lock b/Cargo.lock index 1b328536d5..3071bc8b81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3269,6 +3269,7 @@ dependencies = [ "libdd-remote-config", "libdd-trace-protobuf", "manual_future", + "pyo3", "serde", "serde_json", "serde_with", diff --git a/datadog-ipc/Cargo.toml b/datadog-ipc/Cargo.toml index d528f146fb..3d9f9f7888 100644 --- a/datadog-ipc/Cargo.toml +++ b/datadog-ipc/Cargo.toml @@ -68,3 +68,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(polyfill_glibc_memfd)'] } [features] tiny-bytes = ["libdd-tinybytes"] +# Wake `OneWayShmReader::wait_for_change` via a futex on the generation word +# (Linux only). Without this, the wait falls back to a timed poll. +one_way_shm_futex = [] diff --git a/datadog-ipc/src/lib.rs b/datadog-ipc/src/lib.rs index 2419e16729..5204784703 100644 --- a/datadog-ipc/src/lib.rs +++ b/datadog-ipc/src/lib.rs @@ -10,6 +10,7 @@ pub mod example_interface; pub mod handles; +pub mod one_way_shared_memory; pub mod platform; pub mod rate_limiter; pub mod shm_stats; diff --git a/datadog-ipc/src/one_way_shared_memory.rs b/datadog-ipc/src/one_way_shared_memory.rs new file mode 100644 index 0000000000..79e5600b40 --- /dev/null +++ b/datadog-ipc/src/one_way_shared_memory.rs @@ -0,0 +1,448 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! A single-writer / multiple-reader shared-memory channel. +//! +//! A writer publishes an opaque byte buffer into a shared-memory segment; any +//! number of readers (typically forked child processes that inherit the +//! mapping, or processes that open the same named segment) observe the latest +//! buffer. Consistency is provided by a generation counter (odd = mid-write, +//! even = stable) and a double-copy with an acquire fence. +//! +//! [`OneWayShmReader::wait_for_change`] lets a reader block until the writer +//! publishes new data, rather than busy-polling. On Linux this is a `futex` +//! wait/wake on the low 32 bits of the shared generation counter — an +//! inexpensive, signal-free cross-process notification. Because the generation +//! only ever increments, its low 32 bits are a sufficient wait word (no separate +//! notify field is needed). On other platforms the wait degrades to a timed +//! sleep so callers effectively poll. The wait always takes a timeout, so +//! callers still get periodic wakeups even when the data is unchanged. + +use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle, ShmHandle}; +use libdd_common::MutexExt; +use std::ffi::{CStr, CString}; +use std::io; +use std::sync::atomic::{fence, AtomicU64, Ordering}; +use std::sync::Mutex; +use std::time::Duration; + +pub struct OneWayShmWriter +where + T: FileBackedHandle + From>, +{ + handle: Mutex>, +} + +pub struct OneWayShmReader +where + T: FileBackedHandle + From>, +{ + handle: Option>, + current_data: Option>, + last_wait_generation: u32, + // Optional re-opener: lazily (re)maps the segment when the handle is absent + // (e.g. a named segment opened by path). Readers over an inherited anonymous + // mapping leave this `None`. A fn pointer keeps the reader `Send`/`Sync` + // without a trait impl (which would be orphan-illegal for foreign `D`). + opener: Option Option>>, + pub extra: D, +} + +#[repr(C)] +#[derive(Debug)] +struct RawMetaData { + generation: AtomicU64, + size: usize, +} + +#[repr(C)] +#[derive(Debug)] +struct RawData { + meta: RawMetaData, + buf: [u8], +} + +impl RawData { + fn as_slice(&self) -> &[u8] { + // Safety: size is expected to be truthful + unsafe { std::slice::from_raw_parts(self.buf.as_ptr(), self.meta.size) } + } + + fn as_slice_mut(&mut self) -> &mut [u8] { + // Safety: size is expected to be truthful + unsafe { std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.meta.size) } + } +} + +impl From<&[u64]> for &RawData { + fn from(value: &[u64]) -> Self { + // Safety: MappedMem is supposed to be big enough + // Safety: u64 is aligned + unsafe { &*(value as *const [u64] as *const RawData) } + } +} + +// Safety: Caller needs to ensure the u8 is 8 byte aligned +unsafe fn reinterpret_u8_as_u64_slice(slice: &[u8]) -> &[u64] { + // Safety: given 8 byte alignment, it's guaranteed to be readable + std::slice::from_raw_parts(slice.as_ptr() as *const u64, slice.len().div_ceil(8)) +} + +// The `futex`-based wakeup is gated behind the `one_way_shm_futex` feature (and +// Linux, where cross-process `futex` on shared memory is supported). When it is +// disabled, `wait_for_change` falls back to a timed sleep (callers poll) and +// `write` skips the wake — useful for consumers (e.g. the sidecar) that notify +// out of band. +// +// `addr` points to the 32-bit wait word (the low 32 bits of the generation +// counter). It must be 4-byte aligned and live in shared memory. +#[cfg(all(feature = "one_way_shm_futex", target_os = "linux"))] +fn futex_wake(addr: *const u32) { + // FUTEX_WAKE (non-private) on a shared mapping wakes waiters across + // processes. i32::MAX => wake all waiters. + unsafe { + libc::syscall(libc::SYS_futex, addr, libc::FUTEX_WAKE, i32::MAX); + } +} + +#[cfg(all(feature = "one_way_shm_futex", target_os = "linux"))] +fn futex_wait(addr: *const u32, expected: u32, timeout: Duration) { + let ts = libc::timespec { + tv_sec: timeout.as_secs() as libc::time_t, + tv_nsec: timeout.subsec_nanos() as libc::c_long, + }; + // FUTEX_WAIT atomically checks `*addr == expected` and sleeps if so; returns + // immediately (EAGAIN) otherwise. Spurious wakeups are fine — the caller + // re-checks the generation. + unsafe { + libc::syscall( + libc::SYS_futex, + addr, + libc::FUTEX_WAIT, + expected as libc::c_int, + &ts as *const libc::timespec, + ); + } +} + +#[cfg(not(all(feature = "one_way_shm_futex", target_os = "linux")))] +fn futex_wake(_addr: *const u32) {} + +#[cfg(not(all(feature = "one_way_shm_futex", target_os = "linux")))] +fn futex_wait(_addr: *const u32, _expected: u32, timeout: Duration) { + // No futex (feature disabled or unsupported platform); sleep so callers poll + // the generation at the requested cadence. + std::thread::sleep(timeout); +} + +/// Create a writer backed by a fresh anonymous shared-memory segment, returning +/// the writer and a clonable [`ShmHandle`] to the same segment. +/// +/// The handle can be mapped by readers in the same process or inherited by +/// forked children (an anonymous mapping survives `fork`), letting them build a +/// [`OneWayShmReader`] over what this writer publishes. The segment starts at one +/// page and grows on demand as larger buffers are written. Use +/// [`OneWayShmWriter::new`] instead when readers attach by name rather than +/// inheriting the mapping. +pub fn create_anon_pair() -> anyhow::Result<(OneWayShmWriter, ShmHandle)> { + let handle = ShmHandle::new(0x1000)?; + Ok(( + OneWayShmWriter { + handle: Mutex::new(handle.clone().map()?), + }, + handle, + )) +} + +impl>, D> OneWayShmReader { + /// Create a reader over an already-mapped segment. + /// + /// `handle` is the live mapping to read from — typically an anonymous segment + /// inherited across a `fork`. Passing `None` leaves the reader without a + /// mapping; since this constructor installs no opener, such a reader stays + /// inert (empty reads, `wait_for_change` sleeps) until a handle is supplied — + /// use [`Self::new_with_opener`] when the segment should be opened lazily. + /// `extra` is arbitrary caller state carried alongside the reader. + pub fn new(handle: Option>, extra: D) -> OneWayShmReader { + OneWayShmReader { + handle, + current_data: None, + last_wait_generation: 0, + opener: None, + extra, + } + } + + /// Like [`Self::new`], but with a re-opener used to (re)map the segment when + /// the handle is absent (typically a named segment opened from `extra`). + pub fn new_with_opener( + handle: Option>, + extra: D, + opener: fn(&D) -> Option>, + ) -> OneWayShmReader { + OneWayShmReader { + handle, + current_data: None, + last_wait_generation: 0, + opener: Some(opener), + extra, + } + } + + fn try_open(&self) -> Option> { + self.opener.and_then(|open| open(&self.extra)) + } + + /// Returns the generation of the last successfully read data, or 0 if nothing has been read. + pub fn last_read_generation(&self) -> u64 { + self.current_data + .as_ref() + .map(|d| { + let source_data: &RawData = d.as_slice().into(); + source_data.meta.generation.load(Ordering::Acquire) + }) + .unwrap_or(0) + } +} + +impl>> OneWayShmWriter { + /// Create a writer backed by a named shared-memory segment at `path`. + /// + /// The segment is created and mapped under the given name so that unrelated + /// processes can attach to it by opening the same path (see + /// [`open_named_shm`]). Prefer [`create_anon_pair`] when readers inherit the + /// mapping across a `fork` rather than opening it by name. + pub fn new(path: CString) -> io::Result> { + Ok(OneWayShmWriter { + handle: Mutex::new(NamedShmHandle::create(path, 0x1000)?.map()?), + }) + } +} + +pub fn open_named_shm(path: &CStr) -> io::Result> { + NamedShmHandle::open(path)?.map() +} + +fn skip_last_byte(slice: &[u8]) -> &[u8] { + if slice.is_empty() { + slice + } else { + &slice[..slice.len() - 1] + } +} + +impl>, D> OneWayShmReader { + /// Read the latest published buffer. + /// + /// Returns `(changed, data)`. `changed` is `true` only when the writer + /// published a newer generation than the previous `read` returned, so callers + /// can cheaply skip re-parsing unchanged data. `data` always points at the + /// most recent stable buffer (empty before the first successful read). A write + /// observed mid-flight (odd generation) or one that races the double-copy + /// yields the previously-read buffer with `changed == false`; the next `read` + /// picks it up. If the reader has no mapping yet, it is opened lazily via the + /// opener installed by [`Self::new_with_opener`] (if any). + pub fn read<'a>(&'a mut self) -> (bool, &'a [u8]) { + if let Some(ref handle) = self.handle { + let source_data: &RawData = + unsafe { reinterpret_u8_as_u64_slice(handle.as_slice()) }.into(); + let new_generation = source_data.meta.generation.load(Ordering::Acquire); + + let fetch_data = |reader: &'a mut OneWayShmReader| { + let size = std::mem::size_of::() + source_data.meta.size; + + #[allow(clippy::unwrap_used)] + let handle = reader.handle.as_mut().unwrap(); + handle.ensure_space(size); + + // aligned on 8 byte boundary, round up to closest 8 byte boundary + let mut new_mem = Vec::::with_capacity(size.div_ceil(8)); + new_mem.extend_from_slice(unsafe { + reinterpret_u8_as_u64_slice(&handle.as_slice()[0..size]) + }); + + // refetch, might have been resized + let source_data: &RawData = + unsafe { reinterpret_u8_as_u64_slice(handle.as_slice()) }.into(); + let copied_data: &RawData = new_mem.as_slice().into(); + + // Ensure a new write hasn't started yet + fence(Ordering::Acquire); // prevent loads before from being reordered with gen load after + if new_generation == source_data.meta.generation.load(Ordering::Relaxed) { + reader.current_data.replace(new_mem); + return Some((true, skip_last_byte(copied_data.as_slice()))); + } + None + }; + + if let Some(cur_mem) = &self.current_data { + let cur_data: &RawData = cur_mem.as_slice().into(); + + if new_generation & 1 == 1 { + // mid-write + return (false, skip_last_byte(cur_data.as_slice())); + } + + if new_generation > cur_data.meta.generation.load(Ordering::Relaxed) { + if let Some(success) = fetch_data(self) { + return success; + } + } + + return (false, skip_last_byte(cur_data.as_slice())); + } else { + // first read + + if new_generation & 1 == 1 { + // mid-write + return (false, "".as_bytes()); + } + + if let Some(success) = fetch_data(self) { + return success; + } + } + } else if let Some(handle) = self.try_open() { + self.handle.replace(handle); + return self.read(); + } + + (false, "".as_bytes()) + } + + /// Block until the writer publishes new data (advances the generation + /// counter) or `timeout` elapses. Returns `true` if the generation advanced + /// since the previous call, `false` on timeout. + /// + /// On Linux this is a `futex` wait on the low 32 bits of the shared + /// generation counter; elsewhere it degrades to a `timeout` sleep (the caller + /// then polls via [`Self::read`]). + pub fn wait_for_change(&mut self, timeout: Duration) -> bool { + if self.handle.is_none() { + if let Some(handle) = self.try_open() { + self.handle.replace(handle); + } else { + std::thread::sleep(timeout); + return false; + } + } + + // Raw pointer to the generation atomic inside the live mapping. It lives + // in the first page (before the resizable buffer), so its address is + // stable for the duration of the wait. `wait_for_change` and `read` are + // only ever called from the same (reader) thread, so no concurrent remap + // can invalidate this pointer. + let generation_ptr: *const AtomicU64 = { + let Some(ref handle) = self.handle else { + return false; + }; + let data: &RawData = unsafe { reinterpret_u8_as_u64_slice(handle.as_slice()) }.into(); + &data.meta.generation as *const AtomicU64 + }; + // Safety: see comment above — the mapping outlives this call. + let generation = unsafe { &*generation_ptr }; + // The futex wait word is the low 32 bits of the generation counter (its + // incremental nature makes 32 bits sufficient). On little-endian targets + // (the only ones we futex on) that is the start of the 8-byte counter. + let wait_word = generation_ptr.cast::(); + + let current = generation.load(Ordering::Acquire) as u32; + if current != self.last_wait_generation { + self.last_wait_generation = current; + return true; + } + + futex_wait(wait_word, current, timeout); + + let after = generation.load(Ordering::Acquire) as u32; + let changed = after != self.last_wait_generation; + self.last_wait_generation = after; + changed + } + + /// Drop the current mapping. + /// + /// A subsequent [`Self::read`] or [`Self::wait_for_change`] re-maps the + /// segment through the opener installed by [`Self::new_with_opener`] (if any); + /// without an opener the reader becomes inert until a new handle is supplied. + /// Useful when the backing segment may have been replaced and must be reopened + /// from scratch. + pub fn clear_reader(&mut self) { + self.handle.take(); + } +} + +impl>> OneWayShmWriter { + /// Publish `contents` as the new current buffer, replacing the previous one. + /// + /// Writers are single-producer: the generation counter is bumped to odd + /// before the copy and back to even afterwards (with release ordering) so a + /// reader never observes a torn buffer — one racing the write either retries + /// or keeps its prior copy. The segment grows if `contents` doesn't fit, and a + /// trailing NUL is appended (to keep C consumers happy) that is not part of + /// the data readers see. When built with the `one_way_shm_futex` feature on + /// Linux this also wakes readers blocked in + /// [`OneWayShmReader::wait_for_change`]; the wake is a cheap no-op syscall when + /// there are no waiters. + pub fn write(&self, contents: &[u8]) { + let mut mapped = self.handle.lock_or_panic(); + + let size = contents.len() + 1; // trailing zero byte, to keep some C code happy + mapped.ensure_space(std::mem::size_of::() + size); + + // Safety: ShmHandle is always big enough + // Actually &mut mapped.as_slice_mut() as RawData seems safe, but unsized locals are + // unstable + let data = unsafe { &mut *(mapped.as_slice_mut() as *mut [u8] as *mut RawData) }; + data.meta.generation.fetch_add(1, Ordering::AcqRel); + data.meta.size = size; + + data.as_slice_mut()[0..contents.len()].copy_from_slice(contents); + data.as_slice_mut()[contents.len()] = 0; + + data.meta.generation.fetch_add(1, Ordering::Release); + + // Wake any readers blocked in `wait_for_change` on the generation word. + // A wake with no waiters is a cheap no-op syscall. + futex_wake((&data.meta.generation as *const AtomicU64).cast::()); + } + + /// Borrow the buffer currently published in the segment (excluding the + /// trailing NUL), or an empty slice if nothing has been written yet. + /// + /// This reads the writer's own mapping directly and performs no + /// generation/consistency handshake — unlike [`OneWayShmReader::read`] — so + /// only call it from the writing side where no concurrent `write` is in + /// flight. + pub fn as_slice(&self) -> &[u8] { + let mapped = self.handle.lock_or_panic(); + let data = unsafe { &*(mapped.as_slice() as *const [u8] as *const RawData) }; + if data.meta.size > 0 { + let slice = data.as_slice(); + &slice[..slice.len() - 1] // ignore the trailing zero + } else { + b"" + } + } + + /// The size in bytes of the writer's current mapping. + /// + /// This is the full mapped region (metadata header plus any slack left by + /// growth), not the length of the published payload — use [`Self::as_slice`] + /// for the latter. + pub fn size(&self) -> usize { + self.handle.lock_or_panic().as_slice().len() + } + + /// The current value of the segment's generation counter. + /// + /// It advances on every [`Self::write`] (by two per completed write: odd while + /// writing, even when stable), so it is mainly useful for diagnostics and + /// tests. Its low 32 bits double as the `futex` wait word readers block on in + /// [`OneWayShmReader::wait_for_change`]. + pub fn current_generation(&self) -> u64 { + let mapped = self.handle.lock_or_panic(); + let data = unsafe { &*(mapped.as_slice() as *const [u8] as *const RawData) }; + data.meta.generation.load(Ordering::Acquire) + } +} diff --git a/datadog-sidecar/src/agent_remote_config.rs b/datadog-sidecar/src/agent_remote_config.rs index b01468c56a..70590a84d0 100644 --- a/datadog-sidecar/src/agent_remote_config.rs +++ b/datadog-sidecar/src/agent_remote_config.rs @@ -1,10 +1,8 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::one_way_shared_memory::{ - open_named_shm, OneWayShmReader, OneWayShmWriter, ReaderOpener, -}; use crate::primary_sidecar_identifier; +use datadog_ipc::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter}; use datadog_ipc::platform::{FileBackedHandle, MappedMem, NamedShmHandle, ShmHandle}; use libdd_common::Endpoint; use std::ffi::CString; @@ -38,7 +36,7 @@ fn path_for_endpoint(endpoint: &Endpoint) -> CString { } pub fn create_anon_pair() -> anyhow::Result<(AgentRemoteConfigWriter, ShmHandle)> { - let (writer, handle) = crate::one_way_shared_memory::create_anon_pair()?; + let (writer, handle) = datadog_ipc::one_way_shared_memory::create_anon_pair()?; Ok((AgentRemoteConfigWriter(writer), handle)) } @@ -61,9 +59,10 @@ fn try_open_shm(endpoint: &Endpoint) -> Option> { } pub fn new_reader(endpoint: &Endpoint) -> AgentRemoteConfigReader { - AgentRemoteConfigReader(OneWayShmReader::new( + AgentRemoteConfigReader(OneWayShmReader::new_with_opener( try_open_shm(endpoint), Some(AgentRemoteConfigEndpoint(endpoint.clone())), + |extra| extra.as_ref().and_then(|endpoint| try_open_shm(&endpoint.0)), )) } @@ -80,22 +79,8 @@ pub fn new_writer(endpoint: &Endpoint) -> io::Result for OneWayShmReader> {} -impl ReaderOpener - for OneWayShmReader> -{ - fn open(&self) -> Option> { - self.extra - .as_ref() - .and_then(|endpoint| try_open_shm(&endpoint.0)) - } -} - -impl>> AgentRemoteConfigReader -where - OneWayShmReader>: ReaderOpener, -{ +impl>> AgentRemoteConfigReader { pub fn read(&mut self) -> (bool, &[u8]) { self.0.read() } diff --git a/datadog-sidecar/src/lib.rs b/datadog-sidecar/src/lib.rs index c1996fc117..2c22b02644 100644 --- a/datadog-sidecar/src/lib.rs +++ b/datadog-sidecar/src/lib.rs @@ -14,7 +14,6 @@ mod dump; pub mod entry; #[cfg(feature = "tracing")] pub mod log; -pub mod one_way_shared_memory; mod self_telemetry; pub mod setup; pub mod shm_remote_config; diff --git a/datadog-sidecar/src/one_way_shared_memory.rs b/datadog-sidecar/src/one_way_shared_memory.rs deleted file mode 100644 index 84e41fef63..0000000000 --- a/datadog-sidecar/src/one_way_shared_memory.rs +++ /dev/null @@ -1,245 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use datadog_ipc::platform::{FileBackedHandle, MappedMem, NamedShmHandle, ShmHandle}; -use libdd_common::MutexExt; -use std::ffi::{CStr, CString}; -use std::io; -use std::sync::atomic::{fence, AtomicU64, Ordering}; -use std::sync::Mutex; - -pub struct OneWayShmWriter -where - T: FileBackedHandle + From>, -{ - handle: Mutex>, -} - -pub struct OneWayShmReader -where - T: FileBackedHandle + From>, -{ - handle: Option>, - current_data: Option>, - pub extra: D, -} - -#[repr(C)] -#[derive(Debug)] -struct RawMetaData { - generation: AtomicU64, - size: usize, -} - -#[repr(C)] -#[derive(Debug)] -struct RawData { - meta: RawMetaData, - buf: [u8], -} - -impl RawData { - fn as_slice(&self) -> &[u8] { - // Safety: size is expected to be truthful - unsafe { std::slice::from_raw_parts(self.buf.as_ptr(), self.meta.size) } - } - - fn as_slice_mut(&mut self) -> &mut [u8] { - // Safety: size is expected to be truthful - unsafe { std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.meta.size) } - } -} - -impl From<&[u64]> for &RawData { - fn from(value: &[u64]) -> Self { - // Safety: MappedMem is supposed to be big enough - // Safety: u64 is aligned - unsafe { &*(value as *const [u64] as *const RawData) } - } -} - -// Safety: Caller needs to ensure the u8 is 8 byte aligned -unsafe fn reinterpret_u8_as_u64_slice(slice: &[u8]) -> &[u64] { - // Safety: given 8 byte alignment, it's guaranteed to be readable - std::slice::from_raw_parts(slice.as_ptr() as *const u64, slice.len().div_ceil(8)) -} - -pub fn create_anon_pair() -> anyhow::Result<(OneWayShmWriter, ShmHandle)> { - let handle = ShmHandle::new(0x1000)?; - Ok(( - OneWayShmWriter { - handle: Mutex::new(handle.clone().map()?), - }, - handle, - )) -} - -impl>, D> OneWayShmReader { - pub fn new(handle: Option>, extra: D) -> OneWayShmReader { - OneWayShmReader { - handle, - current_data: None, - extra, - } - } - - /// Returns the generation of the last successfully read data, or 0 if nothing has been read. - pub fn last_read_generation(&self) -> u64 { - self.current_data - .as_ref() - .map(|d| { - let source_data: &RawData = d.as_slice().into(); - source_data.meta.generation.load(Ordering::Acquire) - }) - .unwrap_or(0) - } -} - -impl>> OneWayShmWriter { - pub fn new(path: CString) -> io::Result> { - Ok(OneWayShmWriter { - handle: Mutex::new(NamedShmHandle::create(path, 0x1000)?.map()?), - }) - } -} - -pub trait ReaderOpener -where - T: FileBackedHandle, -{ - fn open(&self) -> Option> { - None - } -} - -pub fn open_named_shm(path: &CStr) -> io::Result> { - NamedShmHandle::open(path)?.map() -} - -fn skip_last_byte(slice: &[u8]) -> &[u8] { - if slice.is_empty() { - slice - } else { - &slice[..slice.len() - 1] - } -} - -impl>, D> OneWayShmReader -where - OneWayShmReader: ReaderOpener, -{ - // bool is true when it changed - pub fn read<'a>(&'a mut self) -> (bool, &'a [u8]) { - if let Some(ref handle) = self.handle { - let source_data: &RawData = - unsafe { reinterpret_u8_as_u64_slice(handle.as_slice()) }.into(); - let new_generation = source_data.meta.generation.load(Ordering::Acquire); - - let fetch_data = |reader: &'a mut OneWayShmReader| { - let size = std::mem::size_of::() + source_data.meta.size; - - #[allow(clippy::unwrap_used)] - let handle = reader.handle.as_mut().unwrap(); - handle.ensure_space(size); - - // aligned on 8 byte boundary, round up to closest 8 byte boundary - let mut new_mem = Vec::::with_capacity(size.div_ceil(8)); - new_mem.extend_from_slice(unsafe { - reinterpret_u8_as_u64_slice(&handle.as_slice()[0..size]) - }); - - // refetch, might have been resized - let source_data: &RawData = - unsafe { reinterpret_u8_as_u64_slice(handle.as_slice()) }.into(); - let copied_data: &RawData = new_mem.as_slice().into(); - - // Ensure a new write hasn't started yet - fence(Ordering::Acquire); // prevent loads before from being reordered with gen load after - if new_generation == source_data.meta.generation.load(Ordering::Relaxed) { - reader.current_data.replace(new_mem); - return Some((true, skip_last_byte(copied_data.as_slice()))); - } - None - }; - - if let Some(cur_mem) = &self.current_data { - let cur_data: &RawData = cur_mem.as_slice().into(); - - if new_generation & 1 == 1 { - // mid-write - return (false, skip_last_byte(cur_data.as_slice())); - } - - if new_generation > cur_data.meta.generation.load(Ordering::Relaxed) { - if let Some(success) = fetch_data(self) { - return success; - } - } - - return (false, skip_last_byte(cur_data.as_slice())); - } else { - // first read - - if new_generation & 1 == 1 { - // mid-write - return (false, "".as_bytes()); - } - - if let Some(success) = fetch_data(self) { - return success; - } - } - } else if let Some(handle) = self.open() { - self.handle.replace(handle); - return self.read(); - } - - (false, "".as_bytes()) - } - - pub fn clear_reader(&mut self) { - self.handle.take(); - } -} - -impl>> OneWayShmWriter { - pub fn write(&self, contents: &[u8]) { - let mut mapped = self.handle.lock_or_panic(); - - let size = contents.len() + 1; // trailing zero byte, to keep some C code happy - mapped.ensure_space(std::mem::size_of::() + size); - - // Safety: ShmHandle is always big enough - // Actually &mut mapped.as_slice_mut() as RawData seems safe, but unsized locals are - // unstable - let data = unsafe { &mut *(mapped.as_slice_mut() as *mut [u8] as *mut RawData) }; - data.meta.generation.fetch_add(1, Ordering::AcqRel); - data.meta.size = size; - - data.as_slice_mut()[0..contents.len()].copy_from_slice(contents); - data.as_slice_mut()[contents.len()] = 0; - - data.meta.generation.fetch_add(1, Ordering::Release); - } - - pub fn as_slice(&self) -> &[u8] { - let mapped = self.handle.lock_or_panic(); - let data = unsafe { &*(mapped.as_slice() as *const [u8] as *const RawData) }; - if data.meta.size > 0 { - let slice = data.as_slice(); - &slice[..slice.len() - 1] // ignore the trailing zero - } else { - b"" - } - } - - pub fn size(&self) -> usize { - self.handle.lock_or_panic().as_slice().len() - } - - pub fn current_generation(&self) -> u64 { - let mapped = self.handle.lock_or_panic(); - let data = unsafe { &*(mapped.as_slice() as *const [u8] as *const RawData) }; - data.meta.generation.load(Ordering::Acquire) - } -} diff --git a/datadog-sidecar/src/service/agent_info.rs b/datadog-sidecar/src/service/agent_info.rs index 9854b138a9..c722a28a8b 100644 --- a/datadog-sidecar/src/service/agent_info.rs +++ b/datadog-sidecar/src/service/agent_info.rs @@ -8,10 +8,10 @@ //! It writes the raw agent response to shared memory at a fixed per-endpoint location, to be //! consumed be tracers. -use crate::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter}; use crate::primary_sidecar_identifier; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; +use datadog_ipc::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter}; use datadog_ipc::platform::NamedShmHandle; use futures::future::Shared; use futures::FutureExt; @@ -188,7 +188,11 @@ impl AgentInfoReader { pub fn new(endpoint: &Endpoint) -> AgentInfoReader { let path = info_path(endpoint); AgentInfoReader { - reader: OneWayShmReader::new(open_named_shm(&path).ok(), path), + reader: OneWayShmReader::new_with_opener( + open_named_shm(&path).ok(), + path, + |path| open_named_shm(path).ok(), + ), info: None, } } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 9954f6eb41..21b30a2559 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -8,10 +8,10 @@ use std::sync::OnceLock; use tokio::sync::mpsc; use tracing::{debug, info, warn}; -use crate::one_way_shared_memory::OneWayShmWriter; use crate::primary_sidecar_identifier; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; +use datadog_ipc::one_way_shared_memory::OneWayShmWriter; use datadog_ipc::platform::NamedShmHandle; use std::collections::{HashMap, HashSet, VecDeque}; use std::ffi::CString; diff --git a/datadog-sidecar/src/shm_remote_config.rs b/datadog-sidecar/src/shm_remote_config.rs index f0c3022d8f..1f6c39f317 100644 --- a/datadog-sidecar/src/shm_remote_config.rs +++ b/datadog-sidecar/src/shm_remote_config.rs @@ -1,15 +1,13 @@ // Unless explicitly stated otherwise all files in this repository are licensed under the Apache // License Version 2.0. This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present Datadog, Inc. -use crate::one_way_shared_memory::{ - open_named_shm, OneWayShmReader, OneWayShmWriter, ReaderOpener, -}; use crate::primary_sidecar_identifier; use crate::service::{DynamicInstrumentationConfigState, InstanceId}; use crate::tracer::SHM_LIMITER; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; -use datadog_ipc::platform::{FileBackedHandle, MappedMem, NamedShmHandle}; +use datadog_ipc::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter}; +use datadog_ipc::platform::{FileBackedHandle, NamedShmHandle}; use datadog_ipc::rate_limiter::ShmLimiter; use datadog_live_debugger::LiveDebuggingData; use libdd_common::{tag::Tag, MutexExt}; @@ -106,14 +104,19 @@ pub fn path_for_remote_config(id: &ConfigInvariants, target: &Arc) -> CS impl RemoteConfigReader { pub fn new(id: &ConfigInvariants, target: &Arc) -> RemoteConfigReader { let path = path_for_remote_config(id, target); - RemoteConfigReader(OneWayShmReader::new(open_named_shm(&path).ok(), path)) + RemoteConfigReader(OneWayShmReader::new_with_opener( + open_named_shm(&path).ok(), + path, + |path| open_named_shm(path).ok(), + )) } pub fn from_path(path: &CStr) -> Self { #[allow(clippy::unwrap_used)] - RemoteConfigReader(OneWayShmReader::new( + RemoteConfigReader(OneWayShmReader::new_with_opener( open_named_shm(path).ok(), CString::new(path.to_bytes()).unwrap(), + |path| open_named_shm(path).ok(), )) } @@ -143,11 +146,6 @@ impl RemoteConfigWriter { } } -impl ReaderOpener for OneWayShmReader { - fn open(&self) -> Option> { - open_named_shm(&self.extra).ok() - } -} #[derive(Default)] struct TargetInfo { diff --git a/libdd-remote-config/Cargo.toml b/libdd-remote-config/Cargo.toml index ae7988ceef..8a93596a0d 100644 --- a/libdd-remote-config/Cargo.toml +++ b/libdd-remote-config/Cargo.toml @@ -27,6 +27,8 @@ client = [ ] regex-lite = ["libdd-common/regex-lite"] +# Expose select types (e.g. RemoteConfigCapabilities) to Python via PyO3. +pyo3 = ["dep:pyo3"] # Enable HTTPS support in the fetcher (rustls + ring via libdd-common). https = ["libdd-common/https"] # FIPS-compliant crypto provider (aws-lc-rs via libdd-common). Unix only. @@ -38,6 +40,7 @@ anyhow = { version = "1.0" } libdd-common = { path = "../libdd-common", version = "4.2.0", default-features = false } libdd-trace-protobuf = { path = "../libdd-trace-protobuf", version = "3.0.2", optional = true } hyper = { workspace = true, optional = true, default-features = false } +pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } http-body-util = {version = "0.1", optional = true } http = { version = "1.1", optional = true } base64 = { version = "0.22.1", optional = true } diff --git a/libdd-remote-config/src/fetch/fetcher.rs b/libdd-remote-config/src/fetch/fetcher.rs index ffa55def0e..90f33e1c53 100644 --- a/libdd-remote-config/src/fetch/fetcher.rs +++ b/libdd-remote-config/src/fetch/fetcher.rs @@ -183,11 +183,11 @@ impl ConfigFetcherState { target_file.state.apply_error = "".to_string(); } ConfigApplyState::Acknowledged => { - target_file.state.apply_state = 1; + target_file.state.apply_state = 2; target_file.state.apply_error = "".to_string(); } ConfigApplyState::Error(error) => { - target_file.state.apply_state = 1; + target_file.state.apply_state = 3; target_file.state.apply_error = error; } } diff --git a/libdd-remote-config/src/fetch/single.rs b/libdd-remote-config/src/fetch/single.rs index 3d48c69183..1eacf7c9e5 100644 --- a/libdd-remote-config/src/fetch/single.rs +++ b/libdd-remote-config/src/fetch/single.rs @@ -77,6 +77,18 @@ impl SingleFetcher { pub fn set_extra_services(&mut self, services: Vec) { self.opaque_state.set_extra_services(services); } + + /// Replace the set of subscribed products and capabilities. + /// + /// Hosts whose product/capability set changes at runtime (e.g. enabling ASM + /// products on remote activation) call this before a subsequent `fetch_once`. + pub fn set_product_capabilities( + &mut self, + products: Vec, + capabilities: Vec, + ) { + self.product_capabilities = ConfigProductCapabilities::new(products, capabilities); + } } pub struct SingleChangesFetcher @@ -129,4 +141,13 @@ where pub fn set_extra_services(&mut self, services: Vec) { self.fetcher.set_extra_services(services); } + + /// See [`SingleFetcher::set_product_capabilities`]. + pub fn set_product_capabilities( + &mut self, + products: Vec, + capabilities: Vec, + ) { + self.fetcher.set_product_capabilities(products, capabilities); + } } diff --git a/libdd-remote-config/src/file_change_tracker.rs b/libdd-remote-config/src/file_change_tracker.rs index 9600849d2d..16b2323b58 100644 --- a/libdd-remote-config/src/file_change_tracker.rs +++ b/libdd-remote-config/src/file_change_tracker.rs @@ -57,14 +57,16 @@ impl ChangeTracker { let files = HashSet::from_iter(files.into_iter().map(FilePathBasedArc)); let mut changes = vec![]; - for file in files.difference(&self.last_files) { - changes.push(Change::Add(file.0.clone())); - } - + // Emit removals before additions/updates so that consumers observe + // configuration deletions strictly before new or updated configs. for file in self.last_files.difference(&files) { changes.push(Change::Remove(file.0.clone())); } + for file in files.difference(&self.last_files) { + changes.push(Change::Add(file.0.clone())); + } + for (updated_file, old_contents) in updated.into_iter() { let file = FilePathBasedArc(updated_file); if files.contains(&file) { @@ -76,3 +78,49 @@ impl ChangeTracker { changes } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{RemoteConfigPath, RemoteConfigProduct, RemoteConfigSource}; + + struct TestFile(RemoteConfigPath); + + impl FilePath for TestFile { + fn path(&self) -> &RemoteConfigPath { + &self.0 + } + } + + fn file(name: &str) -> Arc { + Arc::new(TestFile(RemoteConfigPath { + source: RemoteConfigSource::Employee, + product: RemoteConfigProduct::ApmTracing, + config_id: "id".to_string(), + name: name.to_string(), + })) + } + + #[test] + fn removals_are_emitted_before_additions() { + let mut tracker: ChangeTracker = ChangeTracker::default(); + let a = file("a"); + // First round: "a" is added. + tracker.get_changes::<()>(vec![a.clone()], vec![]); + + // Second round: "a" is gone, "b" appears. The removal of "a" must be + // emitted before the addition of "b". + let b = file("b"); + let changes = tracker.get_changes::<()>(vec![b.clone()], vec![]); + + assert_eq!(changes.len(), 2); + match &changes[0] { + Change::Remove(f) => assert_eq!(f.path().name, "a"), + _ => panic!("expected the removal to be emitted first"), + } + match &changes[1] { + Change::Add(f) => assert_eq!(f.path().name, "b"), + _ => panic!("expected the addition to be emitted after the removal"), + } + } +} diff --git a/libdd-remote-config/src/lib.rs b/libdd-remote-config/src/lib.rs index 462362d160..3922b40199 100644 --- a/libdd-remote-config/src/lib.rs +++ b/libdd-remote-config/src/lib.rs @@ -39,6 +39,7 @@ pub struct Target { #[repr(C)] #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] +#[cfg_attr(feature = "pyo3", pyo3::pyclass(eq, eq_int, from_py_object))] pub enum RemoteConfigCapabilities { AsmActivation = 1, AsmIpBlocking = 2, @@ -83,6 +84,9 @@ pub enum RemoteConfigCapabilities { ApmTracingEnableLiveDebugging = 41, AsmDdMulticonfig = 42, AsmTraceTaggingRules = 43, + AsmExtendedDataCollection = 44, ApmTracingMulticonfig = 45, FfeFlagConfigurationRules = 46, + DdDataStreamsTransactionExtractors = 47, + LlmObsActivation = 48, } diff --git a/libdd-remote-config/src/path.rs b/libdd-remote-config/src/path.rs index 7ed909fab3..2111b3209b 100644 --- a/libdd-remote-config/src/path.rs +++ b/libdd-remote-config/src/path.rs @@ -14,6 +14,7 @@ pub enum RemoteConfigSource { #[repr(C)] #[derive(Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[cfg_attr(feature = "pyo3", pyo3::pyclass(eq, eq_int, hash, frozen, from_py_object))] pub enum RemoteConfigProduct { AgentConfig, AgentTask, @@ -24,6 +25,15 @@ pub enum RemoteConfigProduct { AsmFeatures, FfeFlags, LiveDebugger, + LiveDebuggerSymbolDb, +} + +#[cfg(feature = "pyo3")] +#[pyo3::pymethods] +impl RemoteConfigProduct { + fn __str__(&self) -> String { + self.to_string() + } } impl Display for RemoteConfigProduct { @@ -38,6 +48,7 @@ impl Display for RemoteConfigProduct { RemoteConfigProduct::AsmFeatures => "ASM_FEATURES", RemoteConfigProduct::FfeFlags => "FFE_FLAGS", RemoteConfigProduct::LiveDebugger => "LIVE_DEBUGGING", + RemoteConfigProduct::LiveDebuggerSymbolDb => "LIVE_DEBUGGING_SYMBOL_DB", }; write!(f, "{str}") } @@ -88,6 +99,7 @@ impl RemoteConfigPath { "ASM_FEATURES" => RemoteConfigProduct::AsmFeatures, "FFE_FLAGS" => RemoteConfigProduct::FfeFlags, "LIVE_DEBUGGING" => RemoteConfigProduct::LiveDebugger, + "LIVE_DEBUGGING_SYMBOL_DB" => RemoteConfigProduct::LiveDebuggerSymbolDb, product => anyhow::bail!("Unknown product {}", product), }, config_id: parts[parts.len() - 2], From d4069a158c7671fdf32b717a939c88d05103c731 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Tue, 16 Jun 2026 16:21:48 +0200 Subject: [PATCH 2/4] Apply review Signed-off-by: Bob Weinand --- Cargo.lock | 3 +- datadog-ipc/src/one_way_shared_memory.rs | 62 +++++++++++++--------- datadog-ipc/src/platform/mem_handle.rs | 49 +++++++++-------- datadog-sidecar-ffi/src/lib.rs | 6 +-- datadog-sidecar/src/agent_remote_config.rs | 9 ++-- datadog-sidecar/src/service/agent_info.rs | 8 ++- datadog-sidecar/src/shm_remote_config.rs | 1 - libdd-remote-config/Cargo.toml | 6 +-- libdd-remote-config/src/fetch/single.rs | 3 +- libdd-remote-config/src/lib.rs | 15 +++++- libdd-remote-config/src/path.rs | 22 ++++---- 11 files changed, 108 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3071bc8b81..77fc2d0c81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3269,11 +3269,12 @@ dependencies = [ "libdd-remote-config", "libdd-trace-protobuf", "manual_future", - "pyo3", "serde", "serde_json", "serde_with", "sha2", + "strum", + "strum_macros", "thiserror 2.0.17", "time", "tokio", diff --git a/datadog-ipc/src/one_way_shared_memory.rs b/datadog-ipc/src/one_way_shared_memory.rs index 79e5600b40..2e1aa5670e 100644 --- a/datadog-ipc/src/one_way_shared_memory.rs +++ b/datadog-ipc/src/one_way_shared_memory.rs @@ -22,7 +22,7 @@ use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle, ShmHandle}; use libdd_common::MutexExt; use std::ffi::{CStr, CString}; use std::io; -use std::sync::atomic::{fence, AtomicU64, Ordering}; +use std::sync::atomic::{fence, AtomicU32, AtomicU64, Ordering}; use std::sync::Mutex; use std::time::Duration; @@ -33,6 +33,8 @@ where handle: Mutex>, } +pub type OneWayShmOpener = fn(&D) -> Option>; + pub struct OneWayShmReader where T: FileBackedHandle + From>, @@ -44,7 +46,7 @@ where // (e.g. a named segment opened by path). Readers over an inherited anonymous // mapping leave this `None`. A fn pointer keeps the reader `Send`/`Sync` // without a trait impl (which would be orphan-illegal for foreign `D`). - opener: Option Option>>, + opener: Option>, pub extra: D, } @@ -91,12 +93,16 @@ unsafe fn reinterpret_u8_as_u64_slice(slice: &[u8]) -> &[u64] { // The `futex`-based wakeup is gated behind the `one_way_shm_futex` feature (and // Linux, where cross-process `futex` on shared memory is supported). When it is // disabled, `wait_for_change` falls back to a timed sleep (callers poll) and -// `write` skips the wake — useful for consumers (e.g. the sidecar) that notify -// out of band. +// `write` skips the wake; for consumers like PHP that desire out of band +// notification, we can skip the futex_wake syscall overhead. // // `addr` points to the 32-bit wait word (the low 32 bits of the generation // counter). It must be 4-byte aligned and live in shared memory. -#[cfg(all(feature = "one_way_shm_futex", target_os = "linux"))] +#[cfg(all( + feature = "one_way_shm_futex", + target_os = "linux", + target_endian = "little" +))] fn futex_wake(addr: *const u32) { // FUTEX_WAKE (non-private) on a shared mapping wakes waiters across // processes. i32::MAX => wake all waiters. @@ -105,7 +111,11 @@ fn futex_wake(addr: *const u32) { } } -#[cfg(all(feature = "one_way_shm_futex", target_os = "linux"))] +#[cfg(all( + feature = "one_way_shm_futex", + target_os = "linux", + target_endian = "little" +))] fn futex_wait(addr: *const u32, expected: u32, timeout: Duration) { let ts = libc::timespec { tv_sec: timeout.as_secs() as libc::time_t, @@ -125,10 +135,18 @@ fn futex_wait(addr: *const u32, expected: u32, timeout: Duration) { } } -#[cfg(not(all(feature = "one_way_shm_futex", target_os = "linux")))] +#[cfg(not(all( + feature = "one_way_shm_futex", + target_os = "linux", + target_endian = "little" +)))] fn futex_wake(_addr: *const u32) {} -#[cfg(not(all(feature = "one_way_shm_futex", target_os = "linux")))] +#[cfg(not(all( + feature = "one_way_shm_futex", + target_os = "linux", + target_endian = "little" +)))] fn futex_wait(_addr: *const u32, _expected: u32, timeout: Duration) { // No futex (feature disabled or unsupported platform); sleep so callers poll // the generation at the requested cadence. @@ -163,9 +181,9 @@ impl>, D> OneWayShmReader { /// inert (empty reads, `wait_for_change` sleeps) until a handle is supplied — /// use [`Self::new_with_opener`] when the segment should be opened lazily. /// `extra` is arbitrary caller state carried alongside the reader. - pub fn new(handle: Option>, extra: D) -> OneWayShmReader { + pub fn new(handle: MappedMem, extra: D) -> OneWayShmReader { OneWayShmReader { - handle, + handle: Some(handle), current_data: None, last_wait_generation: 0, opener: None, @@ -178,7 +196,7 @@ impl>, D> OneWayShmReader { pub fn new_with_opener( handle: Option>, extra: D, - opener: fn(&D) -> Option>, + opener: OneWayShmOpener, ) -> OneWayShmReader { OneWayShmReader { handle, @@ -267,6 +285,7 @@ impl>, D> OneWayShmReader { let copied_data: &RawData = new_mem.as_slice().into(); // Ensure a new write hasn't started yet + // Note that we actually care about is "dmb ishld" on ARM being emitted. fence(Ordering::Acquire); // prevent loads before from being reordered with gen load after if new_generation == source_data.meta.generation.load(Ordering::Relaxed) { reader.current_data.replace(new_mem); @@ -332,29 +351,24 @@ impl>, D> OneWayShmReader { // stable for the duration of the wait. `wait_for_change` and `read` are // only ever called from the same (reader) thread, so no concurrent remap // can invalidate this pointer. - let generation_ptr: *const AtomicU64 = { + let generation_ptr = { let Some(ref handle) = self.handle else { return false; }; let data: &RawData = unsafe { reinterpret_u8_as_u64_slice(handle.as_slice()) }.into(); - &data.meta.generation as *const AtomicU64 + data.meta.generation.as_ptr().cast::() }; - // Safety: see comment above — the mapping outlives this call. - let generation = unsafe { &*generation_ptr }; - // The futex wait word is the low 32 bits of the generation counter (its - // incremental nature makes 32 bits sufficient). On little-endian targets - // (the only ones we futex on) that is the start of the 8-byte counter. - let wait_word = generation_ptr.cast::(); - - let current = generation.load(Ordering::Acquire) as u32; + let generation = unsafe { AtomicU32::from_ptr(generation_ptr) }; + + let current = generation.load(Ordering::Acquire); if current != self.last_wait_generation { self.last_wait_generation = current; return true; } - futex_wait(wait_word, current, timeout); + futex_wait(generation_ptr, current, timeout); - let after = generation.load(Ordering::Acquire) as u32; + let after = generation.load(Ordering::Acquire); let changed = after != self.last_wait_generation; self.last_wait_generation = after; changed @@ -394,7 +408,7 @@ impl>> OneWayShmWriter { // Actually &mut mapped.as_slice_mut() as RawData seems safe, but unsized locals are // unstable let data = unsafe { &mut *(mapped.as_slice_mut() as *mut [u8] as *mut RawData) }; - data.meta.generation.fetch_add(1, Ordering::AcqRel); + data.meta.generation.fetch_add(1, Ordering::Acquire); data.meta.size = size; data.as_slice_mut()[0..contents.len()].copy_from_slice(contents); diff --git a/datadog-ipc/src/platform/mem_handle.rs b/datadog-ipc/src/platform/mem_handle.rs index d406c64259..97ddffcb59 100644 --- a/datadog-ipc/src/platform/mem_handle.rs +++ b/datadog-ipc/src/platform/mem_handle.rs @@ -88,29 +88,36 @@ where fn get_shm_mut(&mut self) -> &mut ShmHandle; #[cfg(all(unix, not(target_os = "macos")))] fn resize(&mut self, size: usize) -> anyhow::Result<()> { - unsafe { - self.set_mapping_size(size)?; - } - let new_size = self.get_shm().size as libc::off_t; - let fd = self.get_shm().handle.as_owned_fd()?; - // Try to se fallocate on Linux to eagerly commit the new pages: ENOSPC at resize time is - // recoverable; a later SIGBUS mid-execution is not. - #[cfg(target_os = "linux")] - match nix::fcntl::fallocate( - fd.as_raw_fd(), - nix::fcntl::FallocateFlags::empty(), - 0, - new_size, - ) { - Err(nix::Error::EPERM | nix::Error::ENOSYS | nix::Error::ENOTSUP) => { - nix::unistd::ftruncate(fd, new_size)? + let old_size = self.get_shm().size; + fn do_resize(handle: &mut F, size: usize) -> anyhow::Result<()> { + unsafe { + handle.set_mapping_size(size)?; + } + let new_size = handle.get_shm().size as libc::off_t; + let fd = handle.get_shm().handle.as_owned_fd()?; + // Try to use fallocate on Linux to eagerly commit the new pages: ENOSPC at resize time + // is recoverable; a later SIGBUS mid-execution is not. + #[cfg(target_os = "linux")] + match nix::fcntl::fallocate( + fd.as_raw_fd(), + nix::fcntl::FallocateFlags::empty(), + 0, + new_size, + ) { + Err(nix::Error::EPERM | nix::Error::ENOSYS | nix::Error::ENOTSUP) => { + nix::unistd::ftruncate(fd, new_size)? + } + Err(e) => return Err(e.into()), + Ok(_) => {} } - Err(e) => return Err(e.into()), - Ok(_) => {} + #[cfg(not(target_os = "linux"))] + nix::unistd::ftruncate(&fd, new_size)?; + Ok(()) } - #[cfg(not(target_os = "linux"))] - nix::unistd::ftruncate(&fd, new_size)?; - Ok(()) + // Reset on failure + do_resize(self, size).inspect_err(|_| unsafe { + let _ = self.set_mapping_size(old_size); + }) } /// # Safety /// Calling function needs to ensure it's appropriately resized diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index f3c5c97254..25fa4550bc 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -14,13 +14,10 @@ use datadog_ipc::platform::{ FileBackedHandle, MappedMem, NamedShmHandle, PlatformHandle, ShmHandle, }; use datadog_live_debugger::debugger_defs::DebuggerPayload; -use datadog_sidecar::agent_remote_config::{ - new_reader, reader_from_shm, AgentRemoteConfigEndpoint, AgentRemoteConfigWriter, -}; +use datadog_sidecar::agent_remote_config::{new_reader, reader_from_shm, AgentRemoteConfigWriter}; use datadog_sidecar::config; use datadog_sidecar::config::LogMethod; use datadog_sidecar::crashtracker::crashtracker_unix_socket_path; -use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener}; use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ @@ -221,7 +218,6 @@ fn ddog_agent_remote_config_read_generic<'a, T>( ) -> bool where T: FileBackedHandle + From>, - OneWayShmReader>: ReaderOpener, { let (new, contents) = reader.read(); *data = CharSlice::from_bytes(contents); diff --git a/datadog-sidecar/src/agent_remote_config.rs b/datadog-sidecar/src/agent_remote_config.rs index 70590a84d0..0e75283b39 100644 --- a/datadog-sidecar/src/agent_remote_config.rs +++ b/datadog-sidecar/src/agent_remote_config.rs @@ -62,13 +62,17 @@ pub fn new_reader(endpoint: &Endpoint) -> AgentRemoteConfigReader io::Result> { Ok(AgentRemoteConfigReader(OneWayShmReader::new( - Some(handle.map()?), + handle.map()?, None, ))) } @@ -79,7 +83,6 @@ pub fn new_writer(endpoint: &Endpoint) -> io::Result>> AgentRemoteConfigReader { pub fn read(&mut self) -> (bool, &[u8]) { self.0.read() diff --git a/datadog-sidecar/src/service/agent_info.rs b/datadog-sidecar/src/service/agent_info.rs index c722a28a8b..a36fc0b6dc 100644 --- a/datadog-sidecar/src/service/agent_info.rs +++ b/datadog-sidecar/src/service/agent_info.rs @@ -188,11 +188,9 @@ impl AgentInfoReader { pub fn new(endpoint: &Endpoint) -> AgentInfoReader { let path = info_path(endpoint); AgentInfoReader { - reader: OneWayShmReader::new_with_opener( - open_named_shm(&path).ok(), - path, - |path| open_named_shm(path).ok(), - ), + reader: OneWayShmReader::new_with_opener(open_named_shm(&path).ok(), path, |path| { + open_named_shm(path).ok() + }), info: None, } } diff --git a/datadog-sidecar/src/shm_remote_config.rs b/datadog-sidecar/src/shm_remote_config.rs index 1f6c39f317..0fd3000ef5 100644 --- a/datadog-sidecar/src/shm_remote_config.rs +++ b/datadog-sidecar/src/shm_remote_config.rs @@ -146,7 +146,6 @@ impl RemoteConfigWriter { } } - #[derive(Default)] struct TargetInfo { preferred_dynamic_instrumentation: u32, diff --git a/libdd-remote-config/Cargo.toml b/libdd-remote-config/Cargo.toml index 8a93596a0d..adfc071271 100644 --- a/libdd-remote-config/Cargo.toml +++ b/libdd-remote-config/Cargo.toml @@ -27,8 +27,6 @@ client = [ ] regex-lite = ["libdd-common/regex-lite"] -# Expose select types (e.g. RemoteConfigCapabilities) to Python via PyO3. -pyo3 = ["dep:pyo3"] # Enable HTTPS support in the fetcher (rustls + ring via libdd-common). https = ["libdd-common/https"] # FIPS-compliant crypto provider (aws-lc-rs via libdd-common). Unix only. @@ -40,7 +38,6 @@ anyhow = { version = "1.0" } libdd-common = { path = "../libdd-common", version = "4.2.0", default-features = false } libdd-trace-protobuf = { path = "../libdd-trace-protobuf", version = "3.0.2", optional = true } hyper = { workspace = true, optional = true, default-features = false } -pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } http-body-util = {version = "0.1", optional = true } http = { version = "1.1", optional = true } base64 = { version = "0.22.1", optional = true } @@ -57,6 +54,9 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde_with = "3" thiserror = "2" hashbrown = "0.15" +# `EnumIter` for external consumers to make struct available to runtime +strum = { version = "0.26", default-features = false } +strum_macros = "0.26" # Test feature hyper-util = { workspace = true, features = ["service"], optional = true } diff --git a/libdd-remote-config/src/fetch/single.rs b/libdd-remote-config/src/fetch/single.rs index 1eacf7c9e5..fb44610906 100644 --- a/libdd-remote-config/src/fetch/single.rs +++ b/libdd-remote-config/src/fetch/single.rs @@ -148,6 +148,7 @@ where products: Vec, capabilities: Vec, ) { - self.fetcher.set_product_capabilities(products, capabilities); + self.fetcher + .set_product_capabilities(products, capabilities); } } diff --git a/libdd-remote-config/src/lib.rs b/libdd-remote-config/src/lib.rs index 3922b40199..a859c72e6c 100644 --- a/libdd-remote-config/src/lib.rs +++ b/libdd-remote-config/src/lib.rs @@ -37,9 +37,20 @@ pub struct Target { } #[repr(C)] -#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] +#[derive( + Debug, + Copy, + Clone, + Hash, + Eq, + PartialEq, + Serialize, + Deserialize, + strum_macros::EnumIter, + strum_macros::IntoStaticStr, + strum_macros::Display, +)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] -#[cfg_attr(feature = "pyo3", pyo3::pyclass(eq, eq_int, from_py_object))] pub enum RemoteConfigCapabilities { AsmActivation = 1, AsmIpBlocking = 2, diff --git a/libdd-remote-config/src/path.rs b/libdd-remote-config/src/path.rs index 2111b3209b..5e9f85d066 100644 --- a/libdd-remote-config/src/path.rs +++ b/libdd-remote-config/src/path.rs @@ -13,8 +13,18 @@ pub enum RemoteConfigSource { } #[repr(C)] -#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] -#[cfg_attr(feature = "pyo3", pyo3::pyclass(eq, eq_int, hash, frozen, from_py_object))] +#[derive( + Debug, + Copy, + Clone, + Eq, + Hash, + PartialEq, + Serialize, + Deserialize, + strum_macros::EnumIter, + strum_macros::IntoStaticStr, +)] pub enum RemoteConfigProduct { AgentConfig, AgentTask, @@ -28,14 +38,6 @@ pub enum RemoteConfigProduct { LiveDebuggerSymbolDb, } -#[cfg(feature = "pyo3")] -#[pyo3::pymethods] -impl RemoteConfigProduct { - fn __str__(&self) -> String { - self.to_string() - } -} - impl Display for RemoteConfigProduct { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let str = match self { From 8c76b5c9a60e24163773998b9dc8783c54f0b8a7 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Tue, 16 Jun 2026 22:56:49 +0200 Subject: [PATCH 3/4] Add SHM resize and access to file_store in SingleFetcher Signed-off-by: Bob Weinand --- datadog-ipc/src/one_way_shared_memory.rs | 12 ++++++++++++ datadog-ipc/src/platform/unix/mem_handle.rs | 7 +++++++ datadog-ipc/src/platform/unix/mem_handle_macos.rs | 8 ++++++++ datadog-ipc/src/platform/windows/mem_handle.rs | 6 ++++++ libdd-remote-config/src/fetch/single.rs | 5 +++++ 5 files changed, 38 insertions(+) diff --git a/datadog-ipc/src/one_way_shared_memory.rs b/datadog-ipc/src/one_way_shared_memory.rs index 2e1aa5670e..16d5d2879f 100644 --- a/datadog-ipc/src/one_way_shared_memory.rs +++ b/datadog-ipc/src/one_way_shared_memory.rs @@ -223,6 +223,18 @@ impl>, D> OneWayShmReader { } } +impl OneWayShmWriter { + /// Consume the writer, unmapping it and returning a handle to the segment — + /// for a forked child (or any consumer) that no longer needs to write and + /// just wants to hand the segment to a reader. No extra handle clones linger. + pub fn into_handle(self) -> ShmHandle { + self.handle + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + .into() + } +} + impl>> OneWayShmWriter { /// Create a writer backed by a named shared-memory segment at `path`. /// diff --git a/datadog-ipc/src/platform/unix/mem_handle.rs b/datadog-ipc/src/platform/unix/mem_handle.rs index a247aac5d4..3a30f61367 100644 --- a/datadog-ipc/src/platform/unix/mem_handle.rs +++ b/datadog-ipc/src/platform/unix/mem_handle.rs @@ -156,6 +156,13 @@ impl ShmHandle { ftruncate(handle.as_owned_fd()?, size as off_t)?; Ok(ShmHandle { handle, size }) } + + /// Refresh the size of the shared memory segment + pub fn adjust_to_file_size(&mut self) -> io::Result<()> { + let fd = self.handle.as_owned_fd()?; + self.size = nix::sys::stat::fstat(fd.as_fd())?.st_size as usize; + Ok(()) + } } impl NamedShmHandle { diff --git a/datadog-ipc/src/platform/unix/mem_handle_macos.rs b/datadog-ipc/src/platform/unix/mem_handle_macos.rs index 8da792e7dd..f6472f0b23 100644 --- a/datadog-ipc/src/platform/unix/mem_handle_macos.rs +++ b/datadog-ipc/src/platform/unix/mem_handle_macos.rs @@ -201,6 +201,14 @@ impl>> MappedMem { } } +impl ShmHandle { + /// Refresh the size of the shared memory segment + pub fn adjust_to_file_size(&mut self) -> io::Result<()> { + self.size = NOT_COMMITTED; + Ok(()) + } +} + impl Drop for ShmPath { fn drop(&mut self) { _ = shm_unlink(path_slice(self.name.as_c_str())); diff --git a/datadog-ipc/src/platform/windows/mem_handle.rs b/datadog-ipc/src/platform/windows/mem_handle.rs index b2b65cda10..4ca21721b9 100644 --- a/datadog-ipc/src/platform/windows/mem_handle.rs +++ b/datadog-ipc/src/platform/windows/mem_handle.rs @@ -111,6 +111,12 @@ impl ShmHandle { size: size | NOT_COMMITTED, }) } + + /// Refresh the size of the shared memory segment + pub fn adjust_to_file_size(&mut self) -> std::io::Result<()> { + self.size = NOT_COMMITTED; + Ok(()) + } } impl NamedShmHandle { diff --git a/libdd-remote-config/src/fetch/single.rs b/libdd-remote-config/src/fetch/single.rs index fb44610906..bcc4398066 100644 --- a/libdd-remote-config/src/fetch/single.rs +++ b/libdd-remote-config/src/fetch/single.rs @@ -66,6 +66,11 @@ impl SingleFetcher { &self.client_id } + /// Accesses the underlying file storage (the [`ConfigFetcher`]'s `file_storage`). + pub fn file_storage(&self) -> &S { + &self.fetcher.file_storage + } + /// Sets the apply state on a stored file. pub fn set_config_state(&self, file: &RemoteConfigPath, state: ConfigApplyState) { self.fetcher.set_config_state(file, state) From c06b8a8e8ac384e73baea86c8effda52821cb638 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 17 Jun 2026 23:59:16 +0200 Subject: [PATCH 4/4] Fix compilation Signed-off-by: Bob Weinand --- datadog-ipc/src/platform/unix/mem_handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog-ipc/src/platform/unix/mem_handle.rs b/datadog-ipc/src/platform/unix/mem_handle.rs index 3a30f61367..dc74f9b52f 100644 --- a/datadog-ipc/src/platform/unix/mem_handle.rs +++ b/datadog-ipc/src/platform/unix/mem_handle.rs @@ -160,7 +160,7 @@ impl ShmHandle { /// Refresh the size of the shared memory segment pub fn adjust_to_file_size(&mut self) -> io::Result<()> { let fd = self.handle.as_owned_fd()?; - self.size = nix::sys::stat::fstat(fd.as_fd())?.st_size as usize; + self.size = nix::sys::stat::fstat(fd.as_raw_fd())?.st_size as usize; Ok(()) } }