Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions rs/moq-lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

[features]
serde = ["dep:serde"]
# Retained for backwards compatibility; serde is now always available.
serde = []

[dependencies]
bytes = "1"
conducer = { workspace = true }
futures = "0.3"
num_enum = "0.7"
rand = "0.9.2"
serde = { workspace = true, optional = true, features = ["rc"] }
serde = { workspace = true, features = ["rc"] }
serde_json = "1"
thiserror = "2"
tokio = { workspace = true, features = ["macros", "io-util", "sync", "test-util", "time"] }
tracing = "0.1"
Expand Down
13 changes: 12 additions & 1 deletion rs/moq-lite/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED,
OriginConsumer, OriginProducer, Session, Version, Versions,
OriginConsumer, OriginProducer, Session, Stats, Version, Versions,
coding::{self, Decode, Encode, Stream},
ietf, lite, setup,
};
Expand All @@ -10,6 +10,7 @@ use crate::{
pub struct Client {
publish: Option<OriginConsumer>,
consume: Option<OriginProducer>,
stats: Option<Stats>,
versions: Versions,
}

Expand All @@ -28,6 +29,13 @@ impl Client {
self
}

/// Attach a [`Stats`] aggregator. Per-broadcast and per-subscription counters
/// will be bumped through this handle for the lifetime of the session.
pub fn with_stats(mut self, stats: impl Into<Option<Stats>>) -> Self {
self.stats = stats.into();
self
}

/// Set both publish and consume from an `OriginProducer`.
///
/// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`.
Expand Down Expand Up @@ -121,6 +129,7 @@ impl Client {
None,
self.publish.clone(),
self.consume.clone(),
self.stats.clone(),
lite::Version::Lite04,
)?;

Expand All @@ -137,6 +146,7 @@ impl Client {
None,
self.publish.clone(),
self.consume.clone(),
self.stats.clone(),
lite::Version::Lite03,
)?;

Expand Down Expand Up @@ -182,6 +192,7 @@ impl Client {
Some(stream),
self.publish.clone(),
self.consume.clone(),
self.stats.clone(),
v,
)?
}
Expand Down
2 changes: 2 additions & 0 deletions rs/moq-lite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ mod path;
mod server;
mod session;
mod setup;
mod stats;
mod version;

pub use client::*;
Expand All @@ -65,6 +66,7 @@ pub use model::*;
pub use path::*;
pub use server::*;
pub use session::*;
pub use stats::*;
pub use version::*;

// Re-export the bytes crate
Expand Down
91 changes: 79 additions & 12 deletions rs/moq-lite/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use web_async::FuturesExt;
use web_transport_trait::Stats;

use crate::{
AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, Track, TrackConsumer,
AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, Stats as MoqStats, Track, TrackConsumer,
coding::{Stream, Writer},
lite::{
self,
Expand All @@ -16,26 +16,38 @@ use crate::{

use super::Version;

pub(super) struct PublisherConfig {
/// The origin we read local broadcasts from. None gives this session a
/// dummy, immediately-closed origin (i.e. nothing to publish).
pub origin: Option<OriginConsumer>,
/// Optional stats aggregator for this session's egress.
pub stats: Option<MoqStats>,
/// Per-session origin id stamped onto outbound announce hops. Shared with
/// the matching `Subscriber` so reflected announces can be filtered.
pub self_origin: Origin,
pub version: Version,
}

pub(super) struct Publisher<S: web_transport_trait::Session> {
session: S,
origin: OriginConsumer,
// The session-level origin id stamped onto outbound hop chains. Shared
// with the Subscriber so it can optionally filter out reflected announces.
stats: Option<MoqStats>,
self_origin: Origin,
priority: PriorityQueue,
version: Version,
}

impl<S: web_transport_trait::Session> Publisher<S> {
pub fn new(session: S, origin: Option<OriginConsumer>, self_origin: Origin, version: Version) -> Self {
pub fn new(session: S, config: PublisherConfig) -> Self {
// Default to a dummy origin that is immediately closed.
let origin = origin.unwrap_or_else(|| Origin::random().produce().consume());
let origin = config.origin.unwrap_or_else(|| Origin::random().produce().consume());
Self {
session,
origin,
self_origin,
stats: config.stats,
self_origin: config.self_origin,
priority: Default::default(),
version,
version: config.version,
}
}

Expand Down Expand Up @@ -133,8 +145,9 @@ impl<S: web_transport_trait::Session> Publisher<S> {

let version = self.version;
let self_origin = self.self_origin;
let stats = self.stats.clone();
web_async::spawn(async move {
if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix, self_origin, version).await {
if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix, self_origin, stats, version).await {
match &err {
Error::Cancel | Error::Transport(_) => {
tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled");
Expand All @@ -156,10 +169,17 @@ impl<S: web_transport_trait::Session> Publisher<S> {
origin: &mut OriginConsumer,
prefix: impl AsPath,
self_origin: Origin,
stats: Option<MoqStats>,
version: Version,
) -> Result<(), Error> {
let prefix = prefix.as_path();

// Per-path stats guards: dropping the guard records `broadcasts_closed`.
// The origin contract guarantees announce/unannounce toggles per path, so a
// new active announcement must always be for a path with no live guard.
let mut stats_guards: std::collections::HashMap<crate::PathOwned, crate::PublisherStats> =
std::collections::HashMap::new();

match version {
Version::Lite01 | Version::Lite02 => {
let mut init = Vec::new();
Expand All @@ -171,10 +191,17 @@ impl<S: web_transport_trait::Session> Publisher<S> {

if active.is_some() {
tracing::debug!(broadcast = %origin.absolute(&path), "announce");
if let Some(stats) = stats.as_ref() {
let absolute = origin.absolute(&path).to_owned();
let guard = stats.broadcast(&absolute).publisher();
let prev = stats_guards.insert(absolute, guard);
debug_assert!(prev.is_none(), "origin announced a path that was already active");
}
init.push(suffix.to_owned());
} else {
// A potential race.
tracing::debug!(broadcast = %origin.absolute(&path), "unannounce");
stats_guards.remove(&origin.absolute(&path).to_owned());
init.retain(|path| path != &suffix);
}
}
Expand Down Expand Up @@ -210,10 +237,17 @@ impl<S: web_transport_trait::Session> Publisher<S> {
);
continue;
}
if let Some(stats) = stats.as_ref() {
let absolute = origin.absolute(&path).to_owned();
let guard = stats.broadcast(&absolute).publisher();
let prev = stats_guards.insert(absolute, guard);
debug_assert!(prev.is_none(), "origin announced a path that was already active");
}
let msg = lite::Announce::Active { suffix, hops };
stream.writer.encode(&msg).await?;
} else {
tracing::debug!(broadcast = %origin.absolute(&path), "unannounce");
stats_guards.remove(&origin.absolute(&path).to_owned());
// An ended announce doesn't need hops — the receiver matches on path only.
let msg = lite::Announce::Ended {
suffix,
Expand Down Expand Up @@ -247,9 +281,24 @@ impl<S: web_transport_trait::Session> Publisher<S> {
let priority = self.priority.clone();
let version = self.version;

// Open a publisher-track stats guard tied to this subscription's lifetime.
let track_stats = self
.stats
.as_ref()
.map(|stats| stats.broadcast(&absolute).publisher().track(&track));
Comment thread
coderabbitai[bot] marked this conversation as resolved.

let session = self.session.clone();
web_async::spawn(async move {
if let Err(err) = Self::run_subscribe(session, &mut stream, &subscribe, broadcast, priority, version).await
if let Err(err) = Self::run_subscribe(
session,
&mut stream,
&subscribe,
broadcast,
priority,
track_stats,
version,
)
.await
{
match &err {
// TODO better classify WebTransport errors.
Expand All @@ -275,6 +324,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
subscribe: &lite::Subscribe<'_>,
consumer: Option<BroadcastConsumer>,
priority: PriorityQueue,
track_stats: Option<crate::PublisherTrack>,
version: Version,
) -> Result<(), Error> {
let track = Track {
Expand All @@ -297,8 +347,9 @@ impl<S: web_transport_trait::Session> Publisher<S> {

stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?;

let track_stats = track_stats.map(std::sync::Arc::new);
tokio::select! {
res = Self::run_track(session, track, subscribe, priority, version) => res?,
res = Self::run_track(session, track, subscribe, priority, track_stats, version) => res?,
res = stream.reader.closed() => res?,
}

Expand All @@ -311,6 +362,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
mut track: TrackConsumer,
subscribe: &lite::Subscribe<'_>,
priority: PriorityQueue,
track_stats: Option<std::sync::Arc<crate::PublisherTrack>>,
version: Version,
) -> Result<(), Error> {
let mut tasks = FuturesUnordered::new();
Expand Down Expand Up @@ -340,7 +392,9 @@ impl<S: web_transport_trait::Session> Publisher<S> {
};

let priority = priority.insert(track.priority, sequence);
tasks.push(Self::serve_group(session.clone(), msg, priority, group, version).map(|_| ()));
tasks.push(
Self::serve_group(session.clone(), msg, priority, group, track_stats.clone(), version).map(|_| ()),
);
}
}

Expand All @@ -349,6 +403,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
msg: lite::Group,
mut priority: PriorityHandle,
mut group: GroupConsumer,
track_stats: Option<std::sync::Arc<crate::PublisherTrack>>,
version: Version,
) -> Result<(), Error> {
// TODO add a way to open in priority order.
Expand All @@ -358,6 +413,9 @@ impl<S: web_transport_trait::Session> Publisher<S> {
stream.set_priority(priority.current());
stream.encode(&lite::DataType::Group).await?;
stream.encode(&msg).await?;
if let Some(s) = track_stats.as_deref() {
s.group();
}

loop {
let frame = tokio::select! {
Expand All @@ -377,6 +435,9 @@ impl<S: web_transport_trait::Session> Publisher<S> {
};

stream.encode(&frame.size).await?;
if let Some(s) = track_stats.as_deref() {
s.frame();
}

loop {
let chunk = tokio::select! {
Expand All @@ -391,7 +452,13 @@ impl<S: web_transport_trait::Session> Publisher<S> {
};

match chunk? {
Some(mut chunk) => stream.write_all(&mut chunk).await?,
Some(mut chunk) => {
let n = chunk.len() as u64;
stream.write_all(&mut chunk).await?;
if let Some(s) = track_stats.as_deref() {
s.bytes(n);
}
}
None => break,
}
}
Expand Down
29 changes: 24 additions & 5 deletions rs/moq-lite/src/lite/session.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
BandwidthConsumer, BandwidthProducer, Error, Origin, OriginConsumer, OriginProducer, coding::Stream,
BandwidthConsumer, BandwidthProducer, Error, Origin, OriginConsumer, OriginProducer, Stats, coding::Stream,
lite::SessionInfo,
};

use super::{Publisher, Subscriber, Version};
use super::{Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version};
pub fn start<S: web_transport_trait::Session>(
session: S,
// The stream used to setup the session, after exchanging setup messages.
Expand All @@ -13,6 +13,8 @@ pub fn start<S: web_transport_trait::Session>(
publish: Option<OriginConsumer>,
// We will consume any remote broadcasts, inserting them into this origin.
subscribe: Option<OriginProducer>,
// Optional stats aggregator. None disables instrumentation for this session.
stats: Option<Stats>,
// The version of the protocol to use.
version: Version,
) -> Result<Option<BandwidthConsumer>, Error> {
Expand All @@ -31,9 +33,26 @@ pub fn start<S: web_transport_trait::Session>(
// Shared per-session origin: the publisher stamps it onto outbound
// announce hops, and the subscriber carries it so callers can opt into
// filtering out their own reflected announces.
let origin = Origin::random();
let publisher = Publisher::new(session.clone(), publish, origin, version);
let subscriber = Subscriber::new(session.clone(), subscribe, recv_bw_for_sub, origin, version);
let self_origin = Origin::random();
let publisher = Publisher::new(
session.clone(),
PublisherConfig {
origin: publish,
stats: stats.clone(),
self_origin,
version,
},
);
let subscriber = Subscriber::new(
session.clone(),
SubscriberConfig {
origin: subscribe,
recv_bandwidth: recv_bw_for_sub,
stats,
self_origin,
version,
},
);

web_async::spawn(async move {
let res = tokio::select! {
Expand Down
Loading
Loading