diff --git a/Cargo.lock b/Cargo.lock index ee6985d30..c7f8b9dfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3612,6 +3612,7 @@ dependencies = [ "num_enum", "rand 0.9.4", "serde", + "serde_json", "thiserror 2.0.18", "tokio", "tracing", diff --git a/rs/moq-lite/Cargo.toml b/rs/moq-lite/Cargo.toml index bf6ed705d..5299288ba 100644 --- a/rs/moq-lite/Cargo.toml +++ b/rs/moq-lite/Cargo.toml @@ -13,7 +13,8 @@ keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [features] -serde = ["dep:serde"] +# Retained for backwards compatibility; serde is now always available. +serde = [] [dependencies] bytes = "1" @@ -21,7 +22,8 @@ conducer = { workspace = true } futures = "0.3" num_enum = "0.7" rand = "0.9.2" -serde = { workspace = true, optional = true, features = ["rc"] } +serde = { workspace = true, features = ["rc"] } +serde_json = "1" thiserror = "2" tokio = { workspace = true, features = ["macros", "io-util", "sync", "test-util", "time"] } tracing = "0.1" diff --git a/rs/moq-lite/src/client.rs b/rs/moq-lite/src/client.rs index c670dae9e..66a2ab4ad 100644 --- a/rs/moq-lite/src/client.rs +++ b/rs/moq-lite/src/client.rs @@ -1,6 +1,6 @@ use crate::{ ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED, - OriginConsumer, OriginProducer, Session, Version, Versions, + OriginConsumer, OriginProducer, Session, Stats, Version, Versions, coding::{self, Decode, Encode, Stream}, ietf, lite, setup, }; @@ -10,6 +10,7 @@ use crate::{ pub struct Client { publish: Option, consume: Option, + stats: Option, versions: Versions, } @@ -28,6 +29,13 @@ impl Client { self } + /// Attach a [`Stats`] aggregator. Per-broadcast and per-subscription counters + /// will be bumped through this handle for the lifetime of the session. + pub fn with_stats(mut self, stats: impl Into>) -> Self { + self.stats = stats.into(); + self + } + /// Set both publish and consume from an `OriginProducer`. /// /// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`. @@ -121,6 +129,7 @@ impl Client { None, self.publish.clone(), self.consume.clone(), + self.stats.clone(), lite::Version::Lite04, )?; @@ -137,6 +146,7 @@ impl Client { None, self.publish.clone(), self.consume.clone(), + self.stats.clone(), lite::Version::Lite03, )?; @@ -182,6 +192,7 @@ impl Client { Some(stream), self.publish.clone(), self.consume.clone(), + self.stats.clone(), v, )? } diff --git a/rs/moq-lite/src/lib.rs b/rs/moq-lite/src/lib.rs index 86d0e394c..769e35269 100644 --- a/rs/moq-lite/src/lib.rs +++ b/rs/moq-lite/src/lib.rs @@ -56,6 +56,7 @@ mod path; mod server; mod session; mod setup; +mod stats; mod version; pub use client::*; @@ -65,6 +66,7 @@ pub use model::*; pub use path::*; pub use server::*; pub use session::*; +pub use stats::*; pub use version::*; // Re-export the bytes crate diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 73d6515e5..bbd437f12 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -5,7 +5,7 @@ use web_async::FuturesExt; use web_transport_trait::Stats; use crate::{ - AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, Track, TrackConsumer, + AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, Stats as MoqStats, Track, TrackConsumer, coding::{Stream, Writer}, lite::{ self, @@ -16,26 +16,38 @@ use crate::{ use super::Version; +pub(super) struct PublisherConfig { + /// The origin we read local broadcasts from. None gives this session a + /// dummy, immediately-closed origin (i.e. nothing to publish). + pub origin: Option, + /// Optional stats aggregator for this session's egress. + pub stats: Option, + /// Per-session origin id stamped onto outbound announce hops. Shared with + /// the matching `Subscriber` so reflected announces can be filtered. + pub self_origin: Origin, + pub version: Version, +} + pub(super) struct Publisher { session: S, origin: OriginConsumer, - // The session-level origin id stamped onto outbound hop chains. Shared - // with the Subscriber so it can optionally filter out reflected announces. + stats: Option, self_origin: Origin, priority: PriorityQueue, version: Version, } impl Publisher { - pub fn new(session: S, origin: Option, self_origin: Origin, version: Version) -> Self { + pub fn new(session: S, config: PublisherConfig) -> Self { // Default to a dummy origin that is immediately closed. - let origin = origin.unwrap_or_else(|| Origin::random().produce().consume()); + let origin = config.origin.unwrap_or_else(|| Origin::random().produce().consume()); Self { session, origin, - self_origin, + stats: config.stats, + self_origin: config.self_origin, priority: Default::default(), - version, + version: config.version, } } @@ -133,8 +145,9 @@ impl Publisher { let version = self.version; let self_origin = self.self_origin; + let stats = self.stats.clone(); web_async::spawn(async move { - if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix, self_origin, version).await { + if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix, self_origin, stats, version).await { match &err { Error::Cancel | Error::Transport(_) => { tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); @@ -156,10 +169,17 @@ impl Publisher { origin: &mut OriginConsumer, prefix: impl AsPath, self_origin: Origin, + stats: Option, version: Version, ) -> Result<(), Error> { let prefix = prefix.as_path(); + // Per-path stats guards: dropping the guard records `broadcasts_closed`. + // The origin contract guarantees announce/unannounce toggles per path, so a + // new active announcement must always be for a path with no live guard. + let mut stats_guards: std::collections::HashMap = + std::collections::HashMap::new(); + match version { Version::Lite01 | Version::Lite02 => { let mut init = Vec::new(); @@ -171,10 +191,17 @@ impl Publisher { if active.is_some() { tracing::debug!(broadcast = %origin.absolute(&path), "announce"); + if let Some(stats) = stats.as_ref() { + let absolute = origin.absolute(&path).to_owned(); + let guard = stats.broadcast(&absolute).publisher(); + let prev = stats_guards.insert(absolute, guard); + debug_assert!(prev.is_none(), "origin announced a path that was already active"); + } init.push(suffix.to_owned()); } else { // A potential race. tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); + stats_guards.remove(&origin.absolute(&path).to_owned()); init.retain(|path| path != &suffix); } } @@ -210,10 +237,17 @@ impl Publisher { ); continue; } + if let Some(stats) = stats.as_ref() { + let absolute = origin.absolute(&path).to_owned(); + let guard = stats.broadcast(&absolute).publisher(); + let prev = stats_guards.insert(absolute, guard); + debug_assert!(prev.is_none(), "origin announced a path that was already active"); + } let msg = lite::Announce::Active { suffix, hops }; stream.writer.encode(&msg).await?; } else { tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); + stats_guards.remove(&origin.absolute(&path).to_owned()); // An ended announce doesn't need hops — the receiver matches on path only. let msg = lite::Announce::Ended { suffix, @@ -247,9 +281,24 @@ impl Publisher { let priority = self.priority.clone(); let version = self.version; + // Open a publisher-track stats guard tied to this subscription's lifetime. + let track_stats = self + .stats + .as_ref() + .map(|stats| stats.broadcast(&absolute).publisher().track(&track)); + let session = self.session.clone(); web_async::spawn(async move { - if let Err(err) = Self::run_subscribe(session, &mut stream, &subscribe, broadcast, priority, version).await + if let Err(err) = Self::run_subscribe( + session, + &mut stream, + &subscribe, + broadcast, + priority, + track_stats, + version, + ) + .await { match &err { // TODO better classify WebTransport errors. @@ -275,6 +324,7 @@ impl Publisher { subscribe: &lite::Subscribe<'_>, consumer: Option, priority: PriorityQueue, + track_stats: Option, version: Version, ) -> Result<(), Error> { let track = Track { @@ -297,8 +347,9 @@ impl Publisher { stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; + let track_stats = track_stats.map(std::sync::Arc::new); tokio::select! { - res = Self::run_track(session, track, subscribe, priority, version) => res?, + res = Self::run_track(session, track, subscribe, priority, track_stats, version) => res?, res = stream.reader.closed() => res?, } @@ -311,6 +362,7 @@ impl Publisher { mut track: TrackConsumer, subscribe: &lite::Subscribe<'_>, priority: PriorityQueue, + track_stats: Option>, version: Version, ) -> Result<(), Error> { let mut tasks = FuturesUnordered::new(); @@ -340,7 +392,9 @@ impl Publisher { }; let priority = priority.insert(track.priority, sequence); - tasks.push(Self::serve_group(session.clone(), msg, priority, group, version).map(|_| ())); + tasks.push( + Self::serve_group(session.clone(), msg, priority, group, track_stats.clone(), version).map(|_| ()), + ); } } @@ -349,6 +403,7 @@ impl Publisher { msg: lite::Group, mut priority: PriorityHandle, mut group: GroupConsumer, + track_stats: Option>, version: Version, ) -> Result<(), Error> { // TODO add a way to open in priority order. @@ -358,6 +413,9 @@ impl Publisher { stream.set_priority(priority.current()); stream.encode(&lite::DataType::Group).await?; stream.encode(&msg).await?; + if let Some(s) = track_stats.as_deref() { + s.group(); + } loop { let frame = tokio::select! { @@ -377,6 +435,9 @@ impl Publisher { }; stream.encode(&frame.size).await?; + if let Some(s) = track_stats.as_deref() { + s.frame(); + } loop { let chunk = tokio::select! { @@ -391,7 +452,13 @@ impl Publisher { }; match chunk? { - Some(mut chunk) => stream.write_all(&mut chunk).await?, + Some(mut chunk) => { + let n = chunk.len() as u64; + stream.write_all(&mut chunk).await?; + if let Some(s) = track_stats.as_deref() { + s.bytes(n); + } + } None => break, } } diff --git a/rs/moq-lite/src/lite/session.rs b/rs/moq-lite/src/lite/session.rs index 293384e63..0589820bb 100644 --- a/rs/moq-lite/src/lite/session.rs +++ b/rs/moq-lite/src/lite/session.rs @@ -1,9 +1,9 @@ use crate::{ - BandwidthConsumer, BandwidthProducer, Error, Origin, OriginConsumer, OriginProducer, coding::Stream, + BandwidthConsumer, BandwidthProducer, Error, Origin, OriginConsumer, OriginProducer, Stats, coding::Stream, lite::SessionInfo, }; -use super::{Publisher, Subscriber, Version}; +use super::{Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; pub fn start( session: S, // The stream used to setup the session, after exchanging setup messages. @@ -13,6 +13,8 @@ pub fn start( publish: Option, // We will consume any remote broadcasts, inserting them into this origin. subscribe: Option, + // Optional stats aggregator. None disables instrumentation for this session. + stats: Option, // The version of the protocol to use. version: Version, ) -> Result, Error> { @@ -31,9 +33,26 @@ pub fn start( // Shared per-session origin: the publisher stamps it onto outbound // announce hops, and the subscriber carries it so callers can opt into // filtering out their own reflected announces. - let origin = Origin::random(); - let publisher = Publisher::new(session.clone(), publish, origin, version); - let subscriber = Subscriber::new(session.clone(), subscribe, recv_bw_for_sub, origin, version); + let self_origin = Origin::random(); + let publisher = Publisher::new( + session.clone(), + PublisherConfig { + origin: publish, + stats: stats.clone(), + self_origin, + version, + }, + ); + let subscriber = Subscriber::new( + session.clone(), + SubscriberConfig { + origin: subscribe, + recv_bandwidth: recv_bw_for_sub, + stats, + self_origin, + version, + }, + ); web_async::spawn(async move { let res = tokio::select! { diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index cd28417d7..fe637b57f 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -7,7 +7,7 @@ use futures::{StreamExt, stream::FuturesUnordered}; use crate::{ AsPath, BandwidthProducer, Broadcast, BroadcastDynamic, Error, Frame, FrameProducer, Group, GroupProducer, - OriginProducer, Path, PathOwned, TrackProducer, + OriginProducer, Path, PathOwned, Stats, SubscriberStats, SubscriberTrack, TrackProducer, coding::{Reader, Stream}, lite, model::BroadcastProducer, @@ -17,11 +17,25 @@ use super::Version; use web_async::Lock; +pub(super) struct SubscriberConfig { + /// The origin into which remote broadcasts are inserted. + pub origin: Option, + /// Receiver-side bandwidth producer for PROBE feedback. None disables the + /// feature (used by versions that don't carry probe streams). + pub recv_bandwidth: Option, + /// Optional stats aggregator for this session's ingress. + pub stats: Option, + /// Per-session origin id shared with the matching `Publisher`. + pub self_origin: crate::Origin, + pub version: Version, +} + #[derive(Clone)] pub(super) struct Subscriber { session: S, origin: Option, + stats: Option, recv_bandwidth: Option, // Session-level origin id shared with the Publisher. Kept so callers that // want to filter reflected announces can reuse the same id; for now only @@ -29,27 +43,28 @@ pub(super) struct Subscriber { // on seeing its own publishes as a confirmation signal). #[allow(dead_code)] self_origin: crate::Origin, - subscribes: Lock>, + subscribes: Lock>, next_id: Arc, version: Version, } +#[derive(Clone)] +struct TrackEntry { + producer: TrackProducer, + stats: Option>, +} + impl Subscriber { - pub fn new( - session: S, - origin: Option, - recv_bandwidth: Option, - self_origin: crate::Origin, - version: Version, - ) -> Self { + pub fn new(session: S, config: SubscriberConfig) -> Self { Self { session, - origin, - recv_bandwidth, - self_origin, + origin: config.origin, + stats: config.stats, + recv_bandwidth: config.recv_bandwidth, + self_origin: config.self_origin, subscribes: Default::default(), next_id: Default::default(), - version, + version: config.version, } } @@ -122,6 +137,10 @@ impl Subscriber { stream.writer.encode(&msg).await?; let mut producers = HashMap::new(); + // Per-broadcast subscriber-side stats guards. Dropping the guard records + // `subscriber.broadcasts_closed`. Removed alongside the BroadcastProducer + // when an Ended announcement arrives. + let mut stats_guards: HashMap = HashMap::new(); match self.version { Version::Lite01 | Version::Lite02 => { @@ -129,7 +148,11 @@ impl Subscriber { for suffix in msg.suffixes { let path = prefix.join(&suffix); // Lite01/02 don't carry hop information; the broadcast starts with an empty chain. - self.start_announce(path, crate::OriginList::new(), &mut producers)?; + self.start_announce(path.clone(), crate::OriginList::new(), &mut producers)?; + if let Some(stats) = self.stats.as_ref() { + let guard = stats.broadcast(&path).subscriber(); + stats_guards.insert(path, guard); + } } } _ => { @@ -141,7 +164,11 @@ impl Subscriber { match announce { lite::Announce::Active { suffix, hops } => { let path = prefix.join(&suffix); - self.start_announce(path, hops, &mut producers)?; + self.start_announce(path.clone(), hops, &mut producers)?; + if let Some(stats) = self.stats.as_ref() { + let guard = stats.broadcast(&path).subscriber(); + stats_guards.insert(path, guard); + } } lite::Announce::Ended { suffix, .. } => { let path = prefix.join(&suffix); @@ -150,6 +177,7 @@ impl Subscriber { // Abort the producer. let mut producer = producers.remove(&path).ok_or(Error::NotFound)?; producer.abort(Error::Cancel).ok(); + stats_guards.remove(&path); } } } @@ -266,7 +294,20 @@ impl Subscriber { } async fn run_subscribe(&mut self, id: u64, path: PathOwned, broadcast: BroadcastDynamic, mut track: TrackProducer) { - self.subscribes.lock().insert(id, track.clone()); + // Subscriber-side track stats; counters bump as frames/bytes/groups arrive. + // Drop on subscription end records `subscriber.subscriptions_closed`. + let track_stats = self + .stats + .as_ref() + .map(|stats| Arc::new(stats.broadcast(&path).subscriber().track(&track.name))); + + self.subscribes.lock().insert( + id, + TrackEntry { + producer: track.clone(), + stats: track_stats.clone(), + }, + ); let msg = lite::Subscribe { id, @@ -338,19 +379,24 @@ impl Subscriber { pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { let hdr: lite::Group = stream.decode().await?; - let (mut group, track) = { + let (mut group, track, track_stats) = { let mut subs = self.subscribes.lock(); - let track = subs.get_mut(&hdr.subscribe).ok_or(Error::Cancel)?; + let entry = subs.get_mut(&hdr.subscribe).ok_or(Error::Cancel)?; let group_info = Group { sequence: hdr.sequence }; - let group = track.create_group(group_info)?; - (group, track.clone()) + let group = entry.producer.create_group(group_info)?; + (group, entry.producer.clone(), entry.stats.clone()) }; + // Bump groups counter for this incoming group on the subscriber side. + if let Some(s) = track_stats.as_deref() { + s.group(); + } + let res = tokio::select! { err = track.closed() => Err(err), err = group.closed() => Err(err), - res = self.run_group(stream, group.clone()) => res, + res = self.run_group(stream, group.clone(), track_stats.clone()) => res, }; match res { @@ -373,11 +419,15 @@ impl Subscriber { &mut self, stream: &mut Reader, mut group: GroupProducer, + track_stats: Option>, ) -> Result<(), Error> { while let Some(size) = stream.decode_maybe::().await? { let mut frame = group.create_frame(Frame { size })?; + if let Some(s) = track_stats.as_deref() { + s.frame(); + } - if let Err(err) = self.run_frame(stream, &mut frame).await { + if let Err(err) = self.run_frame(stream, &mut frame, track_stats.as_deref()).await { let _ = frame.abort(err.clone()); return Err(err); } @@ -392,6 +442,7 @@ impl Subscriber { &mut self, stream: &mut Reader, frame: &mut FrameProducer, + track_stats: Option<&SubscriberTrack>, ) -> Result<(), Error> { // FrameProducer impls BufMut over its pre-allocated per-frame buffer, so // read_buf writes QUIC stream bytes directly into the frame — no @@ -399,7 +450,11 @@ impl Subscriber { // as we drain it. while bytes::BufMut::has_remaining_mut(frame) { match stream.read_buf(frame).await? { - Some(n) if n > 0 => {} + Some(n) if n > 0 => { + if let Some(s) = track_stats { + s.bytes(n as u64); + } + } _ => return Err(Error::WrongSize), } } diff --git a/rs/moq-lite/src/model/origin.rs b/rs/moq-lite/src/model/origin.rs index 4eb761507..1e05a7026 100644 --- a/rs/moq-lite/src/model/origin.rs +++ b/rs/moq-lite/src/model/origin.rs @@ -743,8 +743,27 @@ impl OriginConsumer { /// The same path won't be announced/unannounced twice, instead it will toggle. /// Returns None if the consumer is closed. /// + /// Skips paths where any segment starts with `.` (see [`Path::is_hidden`]). + /// Use [`Self::announced_all`] to receive every update including hidden paths. + /// /// Note: The returned path is absolute and will always match this consumer's prefix. pub async fn announced(&mut self) -> Option { + loop { + let next = self.updates.recv().await?; + if !next.0.is_hidden() { + return Some(next); + } + } + } + + /// Like [`Self::announced`] but returns every update, including paths where any + /// segment starts with `.`. + /// + /// Use this when you need to observe infrastructure paths (e.g. `.stats/...`) + /// alongside user-visible broadcasts. Do not mix calls to this and + /// [`Self::announced`] on the same consumer instance: each update is delivered + /// exactly once, so calls compete for the same queue. + pub async fn announced_all(&mut self) -> Option { self.updates.recv().await } @@ -752,7 +771,21 @@ impl OriginConsumer { /// /// Returns None if there is no update available; NOT because the consumer is closed. /// You have to use `is_closed` to check if the consumer is closed. + /// + /// Skips paths where any segment starts with `.` (see [`Path::is_hidden`]). + /// Use [`Self::try_announced_all`] to receive every update including hidden paths. pub fn try_announced(&mut self) -> Option { + loop { + let next = self.updates.try_recv().ok()?; + if !next.0.is_hidden() { + return Some(next); + } + } + } + + /// Like [`Self::try_announced`] but returns every update, including paths where + /// any segment starts with `.`. See [`Self::announced_all`] for caveats. + pub fn try_announced_all(&mut self) -> Option { self.updates.try_recv().ok() } @@ -797,7 +830,7 @@ impl OriginConsumer { } loop { - let (announced, broadcast) = consumer.announced().await?; + let (announced, broadcast) = consumer.announced_all().await?; // `scope` narrows by prefix, but we only want an exact-path match. if announced.as_path() == path { if let Some(broadcast) = broadcast { @@ -1894,4 +1927,92 @@ mod tests { .expect("must not block"); assert!(result.is_none()); } + + #[tokio::test] + async fn test_announced_filters_hidden() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let visible = Broadcast::new().produce(); + let hidden = Broadcast::new().produce(); + + origin.publish_broadcast("foo/bar", visible.consume()); + origin.publish_broadcast(".stats/use", hidden.consume()); + + let mut consumer = origin.consume(); + + // announced() should only see the visible one. + consumer.assert_next("foo/bar", &visible.consume()); + consumer.assert_next_wait(); + } + + #[tokio::test] + async fn test_announced_all_returns_every_update() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let visible = Broadcast::new().produce(); + let hidden = Broadcast::new().produce(); + + origin.publish_broadcast("foo/bar", visible.consume()); + origin.publish_broadcast(".stats/use", hidden.consume()); + + let mut consumer = origin.consume(); + + // announced_all() should see both, in publish order. + let (path, _) = consumer + .announced_all() + .now_or_never() + .expect("next blocked") + .expect("no next"); + assert_eq!(path, Path::new("foo/bar")); + + let (path, broadcast) = consumer + .announced_all() + .now_or_never() + .expect("next blocked") + .expect("no next"); + assert_eq!(path, Path::new(".stats/use")); + assert!(broadcast.unwrap().is_clone(&hidden.consume())); + + // And nothing else. + assert!(consumer.announced_all().now_or_never().is_none()); + } + + #[tokio::test] + async fn test_try_announced_filters_hidden() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let visible = Broadcast::new().produce(); + let hidden = Broadcast::new().produce(); + + origin.publish_broadcast(".stats/use", hidden.consume()); + origin.publish_broadcast("foo/bar", visible.consume()); + + let mut consumer = origin.consume(); + + // try_announced() should skip the hidden one and return the visible one. + let (path, _) = consumer.try_announced().expect("expected announce"); + assert_eq!(path, Path::new("foo/bar")); + assert!(consumer.try_announced().is_none()); + } + + #[tokio::test] + async fn test_announced_broadcast_hidden() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let hidden = Broadcast::new().produce(); + + origin.publish_broadcast(".stats/use", hidden.consume()); + + // announced_broadcast() must work for hidden paths too. + let consumer = origin.consume(); + let result = consumer + .announced_broadcast(".stats/use") + .now_or_never() + .expect("next blocked"); + assert!(result.is_some()); + } } diff --git a/rs/moq-lite/src/path.rs b/rs/moq-lite/src/path.rs index 0321897ca..b1e704486 100644 --- a/rs/moq-lite/src/path.rs +++ b/rs/moq-lite/src/path.rs @@ -190,6 +190,14 @@ impl<'a> Path<'a> { self.0.is_empty() } + /// True if any '/'-separated segment starts with '.'. + /// + /// Hidden paths represent infrastructure broadcasts (e.g. `.stats/...`) + /// that should not appear in default discovery. + pub fn is_hidden(&self) -> bool { + self.0.split('/').any(|s| s.starts_with('.')) + } + pub fn len(&self) -> usize { self.0.len() } @@ -873,6 +881,21 @@ mod tests { assert_eq!(prefix.strip_prefix("foo/bar/").unwrap().as_str(), ""); } + #[test] + fn test_is_hidden() { + assert!(!Path::new("").is_hidden()); + assert!(!Path::new("foo").is_hidden()); + assert!(!Path::new("foo/bar").is_hidden()); + assert!(!Path::new("foo/bar/baz").is_hidden()); + + assert!(Path::new(".foo").is_hidden()); + assert!(Path::new(".stats").is_hidden()); + assert!(Path::new(".stats/use").is_hidden()); + assert!(Path::new(".stats/demo/use").is_hidden()); + assert!(Path::new("foo/.bar").is_hidden()); + assert!(Path::new("foo/bar/.baz").is_hidden()); + } + #[test] fn test_prefix_list_dedup() { // Exact duplicates are removed diff --git a/rs/moq-lite/src/server.rs b/rs/moq-lite/src/server.rs index 79c5adfb1..54856d82f 100644 --- a/rs/moq-lite/src/server.rs +++ b/rs/moq-lite/src/server.rs @@ -1,6 +1,6 @@ use crate::{ ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED, - OriginConsumer, OriginProducer, Session, Version, Versions, + OriginConsumer, OriginProducer, Session, Stats, Version, Versions, coding::{Decode, Encode, Stream}, ietf, lite, setup, }; @@ -10,6 +10,7 @@ use crate::{ pub struct Server { publish: Option, consume: Option, + stats: Option, versions: Versions, } @@ -28,6 +29,13 @@ impl Server { self } + /// Attach a [`Stats`] aggregator. Per-broadcast and per-subscription counters + /// will be bumped through this handle for the lifetime of the session. + pub fn with_stats(mut self, stats: impl Into>) -> Self { + self.stats = stats.into(); + self + } + /// Set both publish and consume from an `OriginProducer`. /// /// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`. @@ -86,6 +94,7 @@ impl Server { )?; tracing::debug!(version = ?v, "connected"); + // TODO: ietf code path does not yet record stats. return Ok(Session::new(session, v, None)); } Some(ALPN_16) => { @@ -119,6 +128,7 @@ impl Server { None, self.publish.clone(), self.consume.clone(), + self.stats.clone(), lite::Version::Lite04, )?; @@ -135,6 +145,7 @@ impl Server { None, self.publish.clone(), self.consume.clone(), + self.stats.clone(), lite::Version::Lite03, )?; @@ -184,6 +195,7 @@ impl Server { Some(stream), self.publish.clone(), self.consume.clone(), + self.stats.clone(), v, )? } diff --git a/rs/moq-lite/src/stats.rs b/rs/moq-lite/src/stats.rs new file mode 100644 index 000000000..43e93d4c7 --- /dev/null +++ b/rs/moq-lite/src/stats.rs @@ -0,0 +1,688 @@ +//! Generic stats publishing for moq-lite sessions. +//! +//! [`Stats`] aggregates per-broadcast counter bumps into per-prefix levels and +//! publishes a `.stats//` broadcast on a caller-provided +//! [`OriginProducer`]. Each stats broadcast carries two tracks: +//! +//! * `publisher` - egress (counters bumped when serving subscriptions) +//! * `subscriber` - ingress (counters bumped when receiving data) +//! +//! A caller that wants to differentiate two classes of traffic (e.g. internal +//! cluster peers vs external customers) constructs two [`Stats`] instances with +//! different `name`s and hands each session the appropriate one via +//! [`crate::Client::with_stats`] / [`crate::Server::with_stats`]. +//! +//! # Lifecycle +//! +//! No background work runs while no role has an active subscription. The first +//! `track()` call on a level (in either role) spawns a per-level snapshot task +//! that ticks every second. The task exits the moment both roles report zero +//! active subscriptions, dropping its [`BroadcastProducer`] and unannouncing. +//! +//! # Cycles +//! +//! Calling [`Stats::broadcast`] for a hidden path (any segment starting with +//! `.`) returns an empty handle whose bumps no-op. This breaks the obvious +//! feedback loop where serving a `.stats/...` broadcast would generate more +//! stats traffic. + +use std::{ + collections::HashMap, + sync::{ + Arc, Weak, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use serde::Serialize; +use web_async::{Lock, spawn}; + +use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track}; + +/// Cumulative atomic counters for a single role on a level. +#[derive(Default, Debug)] +#[non_exhaustive] +pub struct Counters { + pub broadcasts: AtomicU64, + pub broadcasts_closed: AtomicU64, + pub subscriptions: AtomicU64, + pub subscriptions_closed: AtomicU64, + pub bytes: AtomicU64, + pub frames: AtomicU64, + pub groups: AtomicU64, +} + +impl Counters { + fn snapshot(&self) -> Snapshot { + Snapshot { + broadcasts: self.broadcasts.load(Ordering::Relaxed), + broadcasts_closed: self.broadcasts_closed.load(Ordering::Relaxed), + subscriptions: self.subscriptions.load(Ordering::Relaxed), + subscriptions_closed: self.subscriptions_closed.load(Ordering::Relaxed), + bytes: self.bytes.load(Ordering::Relaxed), + frames: self.frames.load(Ordering::Relaxed), + groups: self.groups.load(Ordering::Relaxed), + } + } + + fn active(&self) -> bool { + self.subscriptions.load(Ordering::Relaxed) > self.subscriptions_closed.load(Ordering::Relaxed) + } +} + +/// Top-level stats handle. Cheap to clone (`Arc` inside). +#[derive(Clone)] +pub struct Stats { + inner: Arc, +} + +struct StatsInner { + name: String, + levels: u32, + origin: OriginProducer, + entries: Lock>>, +} + +struct Level { + advertised: PathOwned, + publisher: Counters, + subscriber: Counters, + task: Lock>, // unit: presence means a snapshot task is running + origin: OriginProducer, + name: String, + level_key: PathOwned, +} + +impl Stats { + /// Build a new stats aggregator. + /// + /// * `name` is baked into the advertised path of every published stats broadcast, + /// following the convention `.stats//` (or `.stats/` for the root). + /// * `levels` controls how many path-prefix levels stats are bucketed into. A value + /// of `1` produces only the root bucket. `2` adds a per-first-segment bucket, and + /// so on. Levels deeper than the number of segments in a given broadcast path are + /// skipped (we never publish a level whose key equals the broadcast path itself). + /// * `origin` is the [`OriginProducer`] that receives `publish_broadcast` calls + /// for each `.stats/...` broadcast. + pub fn new(name: impl Into, levels: u32, origin: OriginProducer) -> Self { + Self { + inner: Arc::new(StatsInner { + name: name.into(), + levels, + origin, + entries: Lock::default(), + }), + } + } + + /// Returns the configured `name`. + pub fn name(&self) -> &str { + &self.inner.name + } + + /// Returns a per-broadcast handle. Cheap; level state is created lazily and cached. + /// + /// Hidden paths (any segment starting with `.`) return an empty handle whose bumps + /// are no-ops. This keeps stats traffic from feeding back into the aggregator. + pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats { + let path = path.as_path(); + if path.is_hidden() { + return BroadcastStats { levels: Arc::from([]) }; + } + + let keys = level_keys(&path, self.inner.levels); + let mut entries = self.inner.entries.lock(); + let arcs: Vec> = keys + .into_iter() + .map(|key| { + entries + .entry(key.clone()) + .or_insert_with(|| { + let advertised = advertised_path(&key, &self.inner.name); + Arc::new(Level { + advertised, + publisher: Counters::default(), + subscriber: Counters::default(), + task: Lock::new(None), + origin: self.inner.origin.clone(), + name: self.inner.name.clone(), + level_key: key, + }) + }) + .clone() + }) + .collect(); + + BroadcastStats { levels: arcs.into() } + } +} + +/// A per-broadcast handle. Cheap to clone. +/// +/// Open a role-scoped guard via [`Self::publisher`] or [`Self::subscriber`]; each +/// returns a RAII handle whose creation bumps the matching `broadcasts` counter +/// and whose drop bumps `broadcasts_closed`. +#[derive(Clone)] +pub struct BroadcastStats { + levels: Arc<[Arc]>, +} + +impl BroadcastStats { + /// True if this handle is for a hidden path (no levels, all bumps are no-ops). + pub fn is_empty(&self) -> bool { + self.levels.is_empty() + } + + /// Open the publisher (egress) role for this broadcast. + pub fn publisher(&self) -> PublisherStats { + for level in self.levels.iter() { + level.publisher.broadcasts.fetch_add(1, Ordering::Relaxed); + } + PublisherStats { + levels: self.levels.clone(), + } + } + + /// Open the subscriber (ingress) role for this broadcast. + pub fn subscriber(&self) -> SubscriberStats { + for level in self.levels.iter() { + level.subscriber.broadcasts.fetch_add(1, Ordering::Relaxed); + } + SubscriberStats { + levels: self.levels.clone(), + } + } +} + +/// RAII broadcast guard for the publisher role. See [`BroadcastStats::publisher`]. +#[must_use = "drop the guard to record the broadcast as closed"] +pub struct PublisherStats { + levels: Arc<[Arc]>, +} + +impl PublisherStats { + /// Open a track-subscription guard. + /// + /// Bumps `subscriptions` on every level and (on the 0->N transition in any + /// role) spawns the level's snapshot task. Drop bumps `subscriptions_closed`. + /// + /// `_name` is currently unused; counters are per-level only. Reserved for + /// future per-track granularity. + pub fn track(&self, _name: &str) -> PublisherTrack { + for level in self.levels.iter() { + level.publisher.subscriptions.fetch_add(1, Ordering::Relaxed); + ensure_task(level); + } + PublisherTrack { + levels: self.levels.clone(), + } + } +} + +impl Drop for PublisherStats { + fn drop(&mut self) { + for level in self.levels.iter() { + level.publisher.broadcasts_closed.fetch_add(1, Ordering::Relaxed); + } + } +} + +/// RAII broadcast guard for the subscriber role. See [`BroadcastStats::subscriber`]. +#[must_use = "drop the guard to record the broadcast as closed"] +pub struct SubscriberStats { + levels: Arc<[Arc]>, +} + +impl SubscriberStats { + /// Open a track-subscription guard. Mirrors [`PublisherStats::track`]. + pub fn track(&self, _name: &str) -> SubscriberTrack { + for level in self.levels.iter() { + level.subscriber.subscriptions.fetch_add(1, Ordering::Relaxed); + ensure_task(level); + } + SubscriberTrack { + levels: self.levels.clone(), + } + } +} + +impl Drop for SubscriberStats { + fn drop(&mut self) { + for level in self.levels.iter() { + level.subscriber.broadcasts_closed.fetch_add(1, Ordering::Relaxed); + } + } +} + +/// RAII subscription guard for the publisher role. +#[must_use = "drop the guard to record the subscription as closed"] +pub struct PublisherTrack { + levels: Arc<[Arc]>, +} + +impl PublisherTrack { + /// Bumps `frames` once. + pub fn frame(&self) { + for level in self.levels.iter() { + level.publisher.frames.fetch_add(1, Ordering::Relaxed); + } + } + + /// Bumps `bytes` by `n`. + pub fn bytes(&self, n: u64) { + for level in self.levels.iter() { + level.publisher.bytes.fetch_add(n, Ordering::Relaxed); + } + } + + /// Bumps `groups` once. + pub fn group(&self) { + for level in self.levels.iter() { + level.publisher.groups.fetch_add(1, Ordering::Relaxed); + } + } +} + +impl Drop for PublisherTrack { + fn drop(&mut self) { + for level in self.levels.iter() { + level.publisher.subscriptions_closed.fetch_add(1, Ordering::Relaxed); + } + } +} + +/// RAII subscription guard for the subscriber role. +#[must_use = "drop the guard to record the subscription as closed"] +pub struct SubscriberTrack { + levels: Arc<[Arc]>, +} + +impl SubscriberTrack { + /// Bumps `frames` once. + pub fn frame(&self) { + for level in self.levels.iter() { + level.subscriber.frames.fetch_add(1, Ordering::Relaxed); + } + } + + /// Bumps `bytes` by `n`. + pub fn bytes(&self, n: u64) { + for level in self.levels.iter() { + level.subscriber.bytes.fetch_add(n, Ordering::Relaxed); + } + } + + /// Bumps `groups` once. + pub fn group(&self) { + for level in self.levels.iter() { + level.subscriber.groups.fetch_add(1, Ordering::Relaxed); + } + } +} + +impl Drop for SubscriberTrack { + fn drop(&mut self) { + for level in self.levels.iter() { + level.subscriber.subscriptions_closed.fetch_add(1, Ordering::Relaxed); + } + } +} + +fn ensure_task(level: &Arc) { + let mut slot = level.task.lock(); + if slot.is_none() { + *slot = Some(()); + let weak = Arc::downgrade(level); + spawn(run_publisher(weak)); + } +} + +async fn run_publisher(weak: Weak) { + let setup = { + let Some(level) = weak.upgrade() else { + return; + }; + let mut broadcast = Broadcast::new().produce(); + let publisher = match broadcast.create_track(Track { + name: "publisher".into(), + priority: 0, + }) { + Ok(t) => t, + Err(err) => { + tracing::warn!(?err, "stats: failed to create publisher track"); + clear_task(&level); + return; + } + }; + let subscriber = match broadcast.create_track(Track { + name: "subscriber".into(), + priority: 0, + }) { + Ok(t) => t, + Err(err) => { + tracing::warn!(?err, "stats: failed to create subscriber track"); + clear_task(&level); + return; + } + }; + level.origin.publish_broadcast(&level.advertised, broadcast.consume()); + (broadcast, publisher, subscriber) + }; + let (broadcast, mut publisher, mut subscriber) = setup; + + let mut tick = tokio::time::interval(Duration::from_secs(1)); + tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + tick.tick().await; + + let Some(level) = weak.upgrade() else { + return; + }; + + if !level.publisher.active() && !level.subscriber.active() { + // Take the task slot under the lock and re-check. Any subscribe that + // raced with us either landed before we set None (so it sees Some + // and won't respawn) or after, in which case it spawns a fresh task. + let mut slot = level.task.lock(); + if !level.publisher.active() && !level.subscriber.active() { + *slot = None; + drop(slot); + drop(level); + // Drop `broadcast` to unannounce. Leftover producers/consumers + // follow the existing `closed()` watcher in OriginProducer. + drop(broadcast); + return; + } + } + + // Always emit a snapshot for both tracks. Idle roles see their counters + // held steady; that itself is informative for a billing service. + write_snapshot(&mut publisher, "publisher", &level, level.publisher.snapshot()); + write_snapshot(&mut subscriber, "subscriber", &level, level.subscriber.snapshot()); + } +} + +fn clear_task(level: &Level) { + *level.task.lock() = None; +} + +#[derive(Debug, Default, Clone, Copy, Serialize)] +struct Snapshot { + broadcasts: u64, + broadcasts_closed: u64, + subscriptions: u64, + subscriptions_closed: u64, + bytes: u64, + frames: u64, + groups: u64, +} + +#[derive(Debug, Serialize)] +struct SnapshotFrame<'a> { + v: u32, + name: &'a str, + level: &'a str, + role: &'a str, + ts_ms: u64, + #[serde(flatten)] + snapshot: Snapshot, +} + +fn write_snapshot(track: &mut crate::TrackProducer, role: &str, level: &Level, snapshot: Snapshot) { + let frame = SnapshotFrame { + v: 1, + name: &level.name, + level: level.level_key.as_str(), + role, + ts_ms: now_ms(), + snapshot, + }; + + let buf = match serde_json::to_vec(&frame) { + Ok(buf) => buf, + Err(err) => { + tracing::debug!(?err, role, level = %level.advertised, "stats: failed to serialize snapshot"); + return; + } + }; + + if let Err(err) = track.write_frame(buf) { + tracing::debug!(?err, role, level = %level.advertised, "stats: failed to write snapshot frame"); + } +} + +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Compute the level prefix keys this broadcast contributes to. +/// +/// The keys are the prefixes of the broadcast path with 0..N segments, where N is +/// `min(levels, segments)`. The key with `segments` segments is intentionally +/// omitted: it would be equal to the broadcast path itself, which carries no +/// aggregation value. `levels == 0` produces no buckets (stats are effectively +/// disabled). +fn level_keys(broadcast: &Path, levels: u32) -> Vec { + if levels == 0 { + return Vec::new(); + } + if broadcast.is_empty() { + return vec![PathOwned::default()]; + } + + let segs: Vec<&str> = broadcast.as_str().split('/').collect(); + let max = (levels as usize).min(segs.len()); + (0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect() +} + +fn advertised_path(level_key: &Path, name: &str) -> PathOwned { + if level_key.is_empty() { + PathOwned::from(format!(".stats/{name}")) + } else { + PathOwned::from(format!(".stats/{}/{name}", level_key.as_str())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::Ordering::Relaxed; + + use crate::{Origin, Path}; + + use super::*; + + #[test] + fn level_keys_basic() { + let key = |s: &str, n: u32| { + level_keys(&Path::new(s), n) + .into_iter() + .map(|p| p.as_str().to_string()) + .collect::>() + }; + + assert_eq!(key("demo/bbb", 1), vec![""]); + assert_eq!(key("demo/bbb", 2), vec!["", "demo"]); + // Capped: broadcast is 2 segments, levels=3 still yields 2 keys. + assert_eq!(key("demo/bbb", 3), vec!["", "demo"]); + assert_eq!(key("a/b/c/d", 3), vec!["", "a", "a/b"]); + // 1-segment broadcast, levels=2 still yields just root. + assert_eq!(key("demo", 2), vec![""]); + // levels=0 yields no buckets at all. + assert!(key("demo/bbb", 0).is_empty()); + } + + #[test] + fn advertised_path_root_and_nested() { + assert_eq!(advertised_path(&Path::new(""), "use").as_str(), ".stats/use"); + assert_eq!(advertised_path(&Path::new("demo"), "use").as_str(), ".stats/demo/use"); + assert_eq!( + advertised_path(&Path::new("demo/foo"), "use").as_str(), + ".stats/demo/foo/use" + ); + } + + #[tokio::test] + async fn publisher_bumps_publisher_counters() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let stats = Stats::new("use", 2, origin); + let bs = stats.broadcast("demo/bbb"); + let pub_role = bs.publisher(); + let track = pub_role.track("video"); + track.frame(); + track.bytes(100); + track.group(); + drop(track); + drop(pub_role); + + let entries = stats.inner.entries.lock(); + let root = entries.get(&PathOwned::from("")).expect("root level"); + assert_eq!(root.publisher.frames.load(Relaxed), 1); + assert_eq!(root.publisher.bytes.load(Relaxed), 100); + assert_eq!(root.publisher.groups.load(Relaxed), 1); + assert_eq!(root.publisher.subscriptions.load(Relaxed), 1); + assert_eq!(root.publisher.subscriptions_closed.load(Relaxed), 1); + assert_eq!(root.publisher.broadcasts.load(Relaxed), 1); + assert_eq!(root.publisher.broadcasts_closed.load(Relaxed), 1); + // Subscriber must remain untouched. + assert_eq!(root.subscriber.bytes.load(Relaxed), 0); + assert_eq!(root.subscriber.broadcasts.load(Relaxed), 0); + } + + #[tokio::test] + async fn subscriber_bumps_subscriber_counters() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let stats = Stats::new("use", 1, origin); + let bs = stats.broadcast("demo/bbb"); + let sub_role = bs.subscriber(); + let track = sub_role.track("video"); + track.frame(); + track.bytes(50); + + let entries = stats.inner.entries.lock(); + let root = entries.get(&PathOwned::from("")).expect("root level"); + assert_eq!(root.subscriber.frames.load(Relaxed), 1); + assert_eq!(root.subscriber.bytes.load(Relaxed), 50); + assert_eq!(root.subscriber.broadcasts.load(Relaxed), 1); + assert_eq!(root.subscriber.subscriptions.load(Relaxed), 1); + assert_eq!(root.publisher.bytes.load(Relaxed), 0); + } + + #[tokio::test] + async fn bumps_fanout_to_all_levels() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let stats = Stats::new("use", 2, origin); + let bs = stats.broadcast("demo/bbb"); + let p = bs.publisher(); + let track = p.track("video"); + track.bytes(100); + + let entries = stats.inner.entries.lock(); + let root = entries.get(&PathOwned::from("")).expect("root level"); + let demo = entries.get(&PathOwned::from("demo")).expect("demo level"); + assert_eq!(root.publisher.bytes.load(Relaxed), 100); + assert_eq!(demo.publisher.bytes.load(Relaxed), 100); + } + + #[tokio::test] + async fn hidden_paths_are_no_op() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let stats = Stats::new("use", 2, origin); + let bs = stats.broadcast(".stats/use"); + assert!(bs.is_empty()); + + let p = bs.publisher(); + let track = p.track("video"); + track.bytes(100); + track.frame(); + track.group(); + drop(track); + drop(p); + + assert!(stats.inner.entries.lock().is_empty()); + } + + #[tokio::test] + async fn task_spawns_on_first_subscribe_and_announces() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let stats = Stats::new("use", 1, origin.clone()); + let mut consumer = origin.consume(); + + let bs = stats.broadcast("foo/bar"); + let p = bs.publisher(); + let _track = p.track("video"); + + tokio::time::advance(Duration::from_millis(1)).await; + let (path, broadcast) = consumer.announced_all().await.expect("expected announce"); + assert_eq!(path, Path::new(".stats/use")); + assert!(broadcast.is_some()); + } + + #[tokio::test] + async fn task_exits_when_all_roles_idle() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let stats = Stats::new("use", 1, origin.clone()); + let mut consumer = origin.consume(); + + let bs = stats.broadcast("foo/bar"); + let p = bs.publisher(); + let track = p.track("video"); + + tokio::time::advance(Duration::from_millis(1)).await; + let (_, broadcast) = consumer.announced_all().await.expect("expected announce"); + assert!(broadcast.is_some()); + + drop(track); + drop(p); + drop(bs); + + tokio::time::advance(Duration::from_secs(2)).await; + let (path, broadcast) = consumer.announced_all().await.expect("expected unannounce"); + assert_eq!(path, Path::new(".stats/use")); + assert!(broadcast.is_none()); + } + + #[tokio::test] + async fn two_stats_handles_are_independent() { + tokio::time::pause(); + + let origin = Origin::random().produce(); + let external = Stats::new("external", 1, origin.clone()); + let internal = Stats::new("internal", 1, origin.clone()); + let mut consumer = origin.consume(); + + let ext_bs = external.broadcast("foo/bar"); + let int_bs = internal.broadcast("foo/bar"); + let ext_p = ext_bs.publisher(); + let int_p = int_bs.publisher(); + let _ext_track = ext_p.track("video"); + let _int_track = int_p.track("video"); + + // Both stats handles should announce their own broadcast. + let mut seen = std::collections::HashSet::new(); + tokio::time::advance(Duration::from_millis(1)).await; + for _ in 0..2 { + let (path, broadcast) = consumer.announced_all().await.expect("expected announce"); + assert!(broadcast.is_some()); + seen.insert(path.as_str().to_string()); + } + assert!(seen.contains(".stats/external")); + assert!(seen.contains(".stats/internal")); + } +} diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 3010796f4..448e1fd91 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -322,6 +322,12 @@ impl Client { self } + /// Attach a [`moq_lite::Stats`] aggregator to all sessions opened by this client. + pub fn with_stats(mut self, stats: impl Into>) -> Self { + self.moq = self.moq.with_stats(stats); + self + } + /// Start a background reconnect loop that connects to the given URL, /// waits for the session to close, then reconnects with exponential backoff. /// diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 53d8a637a..ba6858547 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -283,6 +283,12 @@ impl Server { self } + /// Attach a [`moq_lite::Stats`] aggregator to all sessions accepted by this server. + pub fn with_stats(mut self, stats: impl Into>) -> Self { + self.moq = self.moq.with_stats(stats); + self + } + // Return the SHA256 fingerprints of all our certificates. pub fn tls_info(&self) -> Arc> { #[cfg(feature = "noq")] @@ -586,6 +592,12 @@ impl Request { self } + /// Attach a [`moq_lite::Stats`] aggregator to this session. + pub fn with_stats(mut self, stats: impl Into>) -> Self { + self.server = self.server.with_stats(stats); + self + } + /// Accept the session, performing rest of the MoQ handshake. pub async fn ok(self) -> anyhow::Result { match self.kind { diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 417ac3e3e..31cb430c1 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -1,10 +1,10 @@ use std::path::PathBuf; use anyhow::Context; -use moq_lite::{Origin, OriginConsumer, OriginProducer}; +use moq_lite::{Origin, OriginConsumer, OriginProducer, Stats}; use url::Url; -use crate::AuthToken; +use crate::{AuthToken, StatsConfig}; /// Configuration for relay clustering. /// @@ -48,14 +48,39 @@ pub struct Cluster { /// All broadcasts, local and remote. Downstream sessions read from here /// (filtered by their auth token) and remote dials both read and write here. pub origin: OriginProducer, + + /// Optional stats aggregators for external (non-mTLS) and internal (mTLS / + /// cluster peer) traffic. Each is an independent [`Stats`] handle publishing + /// its own `.stats//` broadcasts; the connection-acceptance path + /// picks one based on whether the peer authenticated via mTLS. + pub stats_external: Option, + pub stats_internal: Option, } impl Cluster { /// Creates a new cluster with the given configuration and QUIC client. - pub fn new(config: ClusterConfig, client: moq_native::Client) -> Self { + pub fn new(config: ClusterConfig, stats_config: StatsConfig, client: moq_native::Client) -> Self { let origin = Origin::random().produce(); tracing::info!(origin_id = %origin.id, "cluster initialized"); - Cluster { config, client, origin } + let levels = stats_config.levels.max(1); + let stats_external = stats_config + .name + .as_ref() + .map(|name| Stats::new(name.clone(), levels, origin.clone())); + let stats_internal = stats_config + .name + .as_ref() + .map(|name| Stats::new(format!("{name}_internal"), levels, origin.clone())); + if let Some(name) = stats_config.name.as_ref() { + tracing::info!(name, levels = stats_config.levels, "stats publishing enabled"); + } + Cluster { + config, + client, + origin, + stats_external, + stats_internal, + } } /// Returns an [`OriginConsumer`] scoped to this session's subscribe permissions. @@ -145,11 +170,13 @@ impl Cluster { log_url.set_query(None); tracing::info!(url = %log_url, "dialing cluster peer"); + // Cluster-to-cluster traffic is internal by definition. let session = self .client .clone() .with_publish(self.origin.consume()) .with_consume(self.origin.clone()) + .with_stats(self.stats_internal.clone()) .connect(url.clone()) .await .context("failed to connect to cluster peer")?; diff --git a/rs/moq-relay/src/config.rs b/rs/moq-relay/src/config.rs index 465fc59cb..31b485811 100644 --- a/rs/moq-relay/src/config.rs +++ b/rs/moq-relay/src/config.rs @@ -1,7 +1,7 @@ use clap::Parser; use serde::{Deserialize, Serialize}; -use crate::{AuthConfig, ClusterConfig, WebConfig}; +use crate::{AuthConfig, ClusterConfig, StatsConfig, WebConfig}; /// Top-level relay configuration, loadable from CLI arguments, environment /// variables, or a TOML file. @@ -40,6 +40,11 @@ pub struct Config { #[serde(default)] pub web: WebConfig, + /// Stats publishing configuration. Disabled when `stats.name` is unset. + #[command(flatten)] + #[serde(default)] + pub stats: StatsConfig, + /// If provided, load the configuration from this file. #[serde(default)] pub file: Option, diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 1df710087..d86521e99 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -40,8 +40,8 @@ impl Connection { /// Authenticates and serves this connection until it closes. #[tracing::instrument("conn", skip_all, fields(id = self.id))] pub async fn run(self) -> anyhow::Result<()> { - let token = match self.authenticate().await { - Ok(token) => token, + let (token, internal) = match self.authenticate().await { + Ok(out) => out, Err(err) => { let _ = self.request.close(err.status.as_u16()).await; return Err(err.source); @@ -54,13 +54,13 @@ impl Connection { match (&publish, &subscribe) { (Some(publish), Some(subscribe)) => { - tracing::info!(transport, root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "session accepted"); + tracing::info!(transport, internal, root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "session accepted"); } (Some(publish), None) => { - tracing::info!(transport, root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), "publisher accepted"); + tracing::info!(transport, internal, root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), "publisher accepted"); } (None, Some(subscribe)) => { - tracing::info!(transport, root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "subscriber accepted") + tracing::info!(transport, internal, root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "subscriber accepted") } _ => { let _ = self.request.close(http::StatusCode::FORBIDDEN.as_u16()).await; @@ -68,6 +68,15 @@ impl Connection { } } + // mTLS-authenticated peers (including other cluster nodes) report through + // the internal stats handle so a billing service can rate-differentiate + // from external traffic. + let stats = if internal { + self.cluster.stats_internal.clone() + } else { + self.cluster.stats_external.clone() + }; + // Accept the connection. // NOTE: subscribe and publish seem backwards because of how relays work. // We publish the tracks the client is allowed to subscribe to. @@ -76,8 +85,7 @@ impl Connection { .request .with_publish(subscribe) .with_consume(publish) - // TODO: Uncomment when observability feature is merged - // .with_stats(stats) + .with_stats(stats) .ok() .await?; @@ -89,13 +97,14 @@ impl Connection { } /// Resolve an [`AuthToken`] from the request's URL and (optional) mTLS peer - /// identity. Any failure is returned as a [`StatusError`] so [`run`] can - /// close the request with the mapped HTTP status exactly once. + /// identity, plus an `internal` flag indicating whether the peer authenticated + /// via mTLS. Any failure is returned as a [`StatusError`] so [`run`] can close + /// the request with the mapped HTTP status exactly once. /// /// If the client presented a valid mTLS client certificate, JWT is skipped /// and full (cluster) access is granted. The cert's chain to the configured /// CA is the only credential we require. - async fn authenticate(&self) -> Result { + async fn authenticate(&self) -> Result<(AuthToken, bool), StatusError> { let peer = self.request.peer_identity().map_err(|source| StatusError { status: http::StatusCode::FORBIDDEN, source, @@ -103,13 +112,13 @@ impl Connection { if peer.is_some() { tracing::debug!("mTLS peer authenticated"); - return Ok(AuthToken::unrestricted()); + return Ok((AuthToken::unrestricted(), true)); } let params = match self.request.url() { Some(url) => self.auth.params_from_url(url), None => AuthParams::default(), }; - Ok(self.auth.verify(¶ms).await?) + Ok((self.auth.verify(¶ms).await?, false)) } } diff --git a/rs/moq-relay/src/lib.rs b/rs/moq-relay/src/lib.rs index 455c4fc4c..f160de0de 100644 --- a/rs/moq-relay/src/lib.rs +++ b/rs/moq-relay/src/lib.rs @@ -11,6 +11,7 @@ mod auth; mod cluster; mod config; mod connection; +mod stats; mod web; #[cfg(feature = "websocket")] mod websocket; @@ -23,4 +24,5 @@ pub use auth::*; pub use cluster::*; pub use config::*; pub use connection::*; +pub use stats::*; pub use web::*; diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index ed78c5ba9..138ae3f8c 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -49,7 +49,7 @@ async fn main() -> anyhow::Result<()> { config.auth.init().await? }; - let cluster = Cluster::new(config.cluster, client); + let cluster = Cluster::new(config.cluster, config.stats, client); // Create a web server too. mTLS for HTTPS is opt-in via `--web-https-root`. let web = Web::new( diff --git a/rs/moq-relay/src/stats.rs b/rs/moq-relay/src/stats.rs new file mode 100644 index 000000000..0929372a2 --- /dev/null +++ b/rs/moq-relay/src/stats.rs @@ -0,0 +1,33 @@ +//! Relay-side stats configuration. +//! +//! The actual aggregator lives in [`moq_lite::Stats`]; this module just +//! holds the relay-specific config knobs. + +use clap::Args; +use serde::{Deserialize, Serialize}; + +/// Configuration for the relay's stats publishing. +/// +/// Stats are disabled when `name` is unset. When configured, the relay attaches +/// a [`moq_lite::Stats`] aggregator to every session it accepts (and every cluster +/// dial), which publishes `.stats//` broadcasts on the cluster origin. +/// Each level only advertises while at least one role on that level has an active +/// subscription. +#[derive(Args, Clone, Debug, Default, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +#[non_exhaustive] +#[group(id = "stats-config")] +pub struct StatsConfig { + /// Identifier baked into advertised stats paths (`.stats//`). + /// Stats are disabled when unset. + #[arg(long = "stats-name", env = "MOQ_STATS_NAME")] + pub name: Option, + + /// How many path-prefix levels to bucket stats by. + /// + /// `1` produces only the root bucket (`.stats/`). `2` adds a per-first-segment + /// bucket (e.g. `.stats/demo/` for broadcasts under `demo/*`). Levels deeper + /// than the broadcast path's segment count are skipped. + #[arg(long = "stats-levels", env = "MOQ_STATS_LEVELS", default_value_t = 1)] + pub levels: u32, +} diff --git a/rs/moq-relay/src/websocket.rs b/rs/moq-relay/src/websocket.rs index 8bca2de3c..07fd2c739 100644 --- a/rs/moq-relay/src/websocket.rs +++ b/rs/moq-relay/src/websocket.rs @@ -15,7 +15,7 @@ use axum::{ http::StatusCode, response::Response, }; -use moq_lite::{OriginConsumer, OriginProducer}; +use moq_lite::{OriginConsumer, OriginProducer, Stats}; use crate::{AuthParams, AuthToken, WebState, web::AuthQuery, web::MtlsPeer, web::landing_response}; @@ -35,13 +35,20 @@ pub(crate) async fn serve_ws( let ws = ws.protocols(["webtransport"]); let params = AuthParams { path, jwt: query.jwt }; - let token = if mtls.is_some() { + let internal = mtls.is_some(); + let token = if internal { AuthToken::unrestricted() } else { state.auth.verify(¶ms).await? }; let publish = state.cluster.publisher(&token); let subscribe = state.cluster.subscriber(&token); + // mTLS sessions record on the internal stats handle; everything else on external. + let stats = if internal { + state.cluster.stats_internal.clone() + } else { + state.cluster.stats_external.clone() + }; if publish.is_none() && subscribe.is_none() { // Bad token, we can't publish or subscribe. @@ -61,7 +68,7 @@ pub(crate) async fn serve_ws( tungstenite::Error::ConnectionClosed }) .with(tungstenite_to_axum); - let _ = handle_socket(id, socket, publish, subscribe).await; + let _ = handle_socket(id, socket, publish, subscribe, stats).await; })) } @@ -71,6 +78,7 @@ async fn handle_socket( socket: T, publish: Option, subscribe: Option, + stats: Option, ) -> anyhow::Result<()> where T: futures::Stream> @@ -84,8 +92,7 @@ where let session = moq_lite::Server::new() .with_publish(subscribe) .with_consume(publish) - // TODO: Uncomment when observability feature is merged - // .with_stats(stats) + .with_stats(stats) .accept(ws) .await?; session.closed().await.map_err(Into::into)