Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rs/libmoq/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Origin {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
// TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait
// for gossip instead of racing it.
origin.consume().try_consume_broadcast(path).ok_or(Error::BroadcastNotFound)
origin.consume().get_broadcast(path).ok_or(Error::BroadcastNotFound)
}

pub fn publish<P: moq_lite::AsPath>(
Expand Down
5 changes: 3 additions & 2 deletions rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ async fn main() -> anyhow::Result<()> {

let path: moq_lite::Path<'_> = config.broadcast.into();
let mut origin = origin
.consume_only(&[path])
.context("not allowed to consume broadcast")?;
.scope(&[path])
.context("not allowed to consume broadcast")?
.consume();

// The current subscriber if any, dropped after each announce.
let mut clock: Option<clock::Subscriber> = None;
Expand Down
7 changes: 2 additions & 5 deletions rs/moq-lite/src/ietf/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

// We just received a subscribe for this exact namespace, so the peer must have already
// seen the announcement — synchronous lookup is appropriate here.
let Some(broadcast) = self.origin.try_consume_broadcast(&msg.track_namespace) else {
let Some(broadcast) = self.origin.get_broadcast(&msg.track_namespace) else {
self.write_subscribe_error(&mut stream.writer, request_id, 404, "Broadcast not found")
.await?;
return Ok(());
Expand Down Expand Up @@ -548,10 +548,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

tracing::debug!(prefix = %self.origin.absolute(&prefix), "subscribe_namespace stream");

let mut origin = self
.origin
.consume_only(&[prefix.as_path()])
.ok_or(Error::Unauthorized)?;
let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?;

// Send OK response
match self.version {
Expand Down
7 changes: 2 additions & 5 deletions rs/moq-lite/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
let interest = stream.reader.decode::<lite::AnnounceInterest>().await?;
let prefix = interest.prefix.to_owned();

let mut origin = self
.origin
.consume_only(&[prefix.as_path()])
.ok_or(Error::Unauthorized)?;
let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?;

let version = self.version;
let self_origin = self.self_origin;
Expand Down Expand Up @@ -246,7 +243,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

// We just received a subscribe for this exact path, so by definition the peer has
// already seen an announcement for it — synchronous lookup is appropriate here.
let broadcast = self.origin.try_consume_broadcast(&subscribe.broadcast);
let broadcast = self.origin.get_broadcast(&subscribe.broadcast);
let priority = self.priority.clone();
let version = self.version;

Expand Down
14 changes: 14 additions & 0 deletions rs/moq-lite/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,20 @@ impl BroadcastConsumer {
self.state.read().abort.clone().unwrap_or(Error::Dropped)
}

/// Returns true if every [`BroadcastProducer`] has been dropped.
pub fn is_closed(&self) -> bool {
self.state.read().is_closed()
}

/// Register a [`conducer::Waiter`] that fires when the broadcast closes.
///
/// Returns [`Poll::Ready`] if already closed, otherwise [`Poll::Pending`] after
/// arming the waiter. Useful for composing close-detection into a larger poll
/// without spawning a task per broadcast.
pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<()> {
self.state.poll_closed(waiter)
}

/// Check if this is the exact same instance of a broadcast.
pub fn is_clone(&self, other: &Self) -> bool {
self.state.same_channel(&other.state)
Expand Down
Loading
Loading