From 10b5abe95bd6dc1adca00e9e73c30da935848421 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 5 May 2026 11:30:41 -0700 Subject: [PATCH] moq-lite: port Origin API renames from #1358 (without the new state model) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename OriginProducer::publish_only / OriginConsumer::consume_only → scope. - Drop OriginProducer::consume_only and OriginProducer::try_consume_broadcast; callers go through producer.consume().scope(..) / .get_broadcast(..) so the producer surface is publish-only. - Rename OriginConsumer::try_consume_broadcast → get_broadcast (the consumer receiver and try_ prefix were both redundant). - Add BroadcastConsumer::is_closed and poll_closed (thin wrappers over conducer::Consumer) so callers can compose close-detection without spawning a task per broadcast. - Migrate every workspace caller (lite/ietf publishers, libmoq, moq-clock, moq-relay). Keeps the existing OriginNode / mpsc / web_async::spawn implementation, the OriginAnnounce tuple, and the announced/try_announced/announced_broadcast naming. Suffix churn (publish_broadcast → publish, etc.) is intentionally out of scope here — it should be a separate crate-wide pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/libmoq/src/origin.rs | 2 +- rs/moq-clock/src/main.rs | 5 +- rs/moq-lite/src/ietf/publisher.rs | 7 +- rs/moq-lite/src/lite/publisher.rs | 7 +- rs/moq-lite/src/model/broadcast.rs | 14 ++ rs/moq-lite/src/model/origin.rs | 218 ++++++++++++++--------------- rs/moq-relay/src/cluster.rs | 4 +- 7 files changed, 130 insertions(+), 127 deletions(-) diff --git a/rs/libmoq/src/origin.rs b/rs/libmoq/src/origin.rs index 0bbc909c2..e561c779a 100644 --- a/rs/libmoq/src/origin.rs +++ b/rs/libmoq/src/origin.rs @@ -108,7 +108,7 @@ impl Origin { let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; // TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait // for gossip instead of racing it. - origin.consume().try_consume_broadcast(path).ok_or(Error::BroadcastNotFound) + origin.consume().get_broadcast(path).ok_or(Error::BroadcastNotFound) } pub fn publish( diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index f3ecec8e1..ca54afcbe 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -87,8 +87,9 @@ async fn main() -> anyhow::Result<()> { let path: moq_lite::Path<'_> = config.broadcast.into(); let mut origin = origin - .consume_only(&[path]) - .context("not allowed to consume broadcast")?; + .scope(&[path]) + .context("not allowed to consume broadcast")? + .consume(); // The current subscriber if any, dropped after each announce. let mut clock: Option = None; diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 7c4239b2e..954a418c1 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -107,7 +107,7 @@ impl Publisher { // We just received a subscribe for this exact namespace, so the peer must have already // seen the announcement — synchronous lookup is appropriate here. - let Some(broadcast) = self.origin.try_consume_broadcast(&msg.track_namespace) else { + let Some(broadcast) = self.origin.get_broadcast(&msg.track_namespace) else { self.write_subscribe_error(&mut stream.writer, request_id, 404, "Broadcast not found") .await?; return Ok(()); @@ -548,10 +548,7 @@ impl Publisher { tracing::debug!(prefix = %self.origin.absolute(&prefix), "subscribe_namespace stream"); - let mut origin = self - .origin - .consume_only(&[prefix.as_path()]) - .ok_or(Error::Unauthorized)?; + let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?; // Send OK response match self.version { diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 1022f05c2..73d6515e5 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -129,10 +129,7 @@ impl Publisher { let interest = stream.reader.decode::().await?; let prefix = interest.prefix.to_owned(); - let mut origin = self - .origin - .consume_only(&[prefix.as_path()]) - .ok_or(Error::Unauthorized)?; + let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?; let version = self.version; let self_origin = self.self_origin; @@ -246,7 +243,7 @@ impl Publisher { // We just received a subscribe for this exact path, so by definition the peer has // already seen an announcement for it — synchronous lookup is appropriate here. - let broadcast = self.origin.try_consume_broadcast(&subscribe.broadcast); + let broadcast = self.origin.get_broadcast(&subscribe.broadcast); let priority = self.priority.clone(); let version = self.version; diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index 1871480f1..a74ecb6bd 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -382,6 +382,20 @@ impl BroadcastConsumer { self.state.read().abort.clone().unwrap_or(Error::Dropped) } + /// Returns true if every [`BroadcastProducer`] has been dropped. + pub fn is_closed(&self) -> bool { + self.state.read().is_closed() + } + + /// Register a [`conducer::Waiter`] that fires when the broadcast closes. + /// + /// Returns [`Poll::Ready`] if already closed, otherwise [`Poll::Pending`] after + /// arming the waiter. Useful for composing close-detection into a larger poll + /// without spawning a task per broadcast. + pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<()> { + self.state.poll_closed(waiter) + } + /// Check if this is the exact same instance of a broadcast. pub fn is_clone(&self, other: &Self) -> bool { self.state.same_channel(&other.state) diff --git a/rs/moq-lite/src/model/origin.rs b/rs/moq-lite/src/model/origin.rs index 7ea43e04e..fd3484b21 100644 --- a/rs/moq-lite/src/model/origin.rs +++ b/rs/moq-lite/src/model/origin.rs @@ -102,7 +102,7 @@ pub(crate) const MAX_HOPS: usize = 32; #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct OriginList(Vec); -/// Returned when an operation would grow an [`OriginList`] past [`MAX_HOPS`]. +/// Returned when an operation would grow an [`OriginList`] past its hop-count cap. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[non_exhaustive] pub struct TooManyOrigins; @@ -637,11 +637,12 @@ impl OriginProducer { true } - /// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes. + /// Returns a new OriginProducer restricted to publishing under one of `prefixes`. /// - /// Returns None if there are no legal prefixes. + /// Returns None if there are no legal prefixes (the requested prefixes are + /// disjoint from this producer's current scope). // TODO accept PathPrefixes instead of &[Path] - pub fn publish_only(&self, prefixes: &[Path]) -> Option { + pub fn scope(&self, prefixes: &[Path]) -> Option { let prefixes = PathPrefixes::new(prefixes); Some(OriginProducer { info: self.info, @@ -655,34 +656,10 @@ impl OriginProducer { OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) } - /// Subscribe to all announced broadcasts matching the prefix. - /// - /// Returns None if there are no legal prefixes. - // TODO accept PathPrefixes instead of &[Path] - pub fn consume_only(&self, prefixes: &[Path]) -> Option { - let prefixes = PathPrefixes::new(prefixes); - Some(OriginConsumer::new( - self.info, - self.root.clone(), - self.nodes.select(&prefixes)?, - )) - } - - /// Get a broadcast by path if it has *already* been published. - /// - /// Returns `None` when the path is unknown right now. For producers that aggregate - /// announcements from remote origins (e.g. clusters), this races gossip — see - /// [`OriginConsumer::announced_broadcast`] for an async alternative. - pub fn try_consume_broadcast(&self, path: impl AsPath) -> Option { - let path = path.as_path(); - let (root, rest) = self.nodes.get(&path)?; - let state = root.lock(); - state.consume_broadcast(&rest) - } - /// Returns a new OriginProducer that automatically strips out the provided prefix. /// - /// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard. + /// Returns None if the provided root is not authorized; when [`Self::scope`] + /// was already used without a wildcard. pub fn with_root(&self, prefix: impl AsPath) -> Option { let prefix = prefix.as_path(); @@ -782,11 +759,11 @@ impl OriginConsumer { /// Get a broadcast by path if it has *already* been announced. /// /// Returns `None` when the path is unknown to this consumer right now. Synchronous - /// lookup races announcement gossip — freshly-connected consumers will see `None` + /// lookup races announcement gossip — a freshly-connected consumer will see `None` /// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`] /// (blocks until announced) unless you can guarantee the announcement has already /// landed (e.g. you're responding to an `announced()` callback). - pub fn try_consume_broadcast(&self, path: impl AsPath) -> Option { + pub fn get_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); let (root, rest) = self.nodes.get(&path)?; let state = root.lock(); @@ -799,16 +776,16 @@ impl OriginConsumer { /// is closed before the broadcast is announced. The returned broadcast may itself be closed /// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that. /// - /// Prefer this over [`Self::try_consume_broadcast`] when you know the exact path you want - /// but cannot guarantee the announcement has already been received. + /// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but + /// cannot guarantee the announcement has already been received. pub async fn announced_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); // Scope a fresh consumer down to this path so we only wake up for relevant announcements. - let mut consumer = self.consume_only(std::slice::from_ref(&path))?; + let mut consumer = self.scope(std::slice::from_ref(&path))?; - // consume_only keeps narrower permissions intact: if we ask for `foo` on a consumer limited - // to `foo/specific`, consume_only returns a consumer scoped to `foo/specific` — no + // `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited + // to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no // announcement at the exact path `foo` can ever arrive. Bail rather than loop forever. if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) { return None; @@ -816,7 +793,7 @@ impl OriginConsumer { loop { let (announced, broadcast) = consumer.announced().await?; - // consume_only narrows by prefix, but we only want an exact-path match. + // `scope` narrows by prefix, but we only want an exact-path match. if announced.as_path() == path { if let Some(broadcast) = broadcast { return Some(broadcast); @@ -825,11 +802,12 @@ impl OriginConsumer { } } - /// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes. + /// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`. /// - /// Returns None if there are no legal prefixes (would always return None). + /// Returns None if there are no legal prefixes (the requested prefixes are + /// disjoint from this consumer's current scope, so it would always return None). // TODO accept PathPrefixes instead of &[Path] - pub fn consume_only(&self, prefixes: &[Path]) -> Option { + pub fn scope(&self, prefixes: &[Path]) -> Option { let prefixes = PathPrefixes::new(prefixes); Some(OriginConsumer::new( self.info, @@ -840,7 +818,8 @@ impl OriginConsumer { /// Returns a new OriginConsumer that automatically strips out the provided prefix. /// - /// Returns None if the provided root is not authorized; when consume_only was already used without a wildcard. + /// Returns None if the provided root is not authorized; when [`Self::scope`] was + /// already used without a wildcard. pub fn with_root(&self, prefix: impl AsPath) -> Option { let prefix = prefix.as_path(); @@ -1034,7 +1013,7 @@ mod tests { origin.publish_broadcast("test", consumer1.clone()); origin.publish_broadcast("test", consumer2.clone()); origin.publish_broadcast("test", consumer3.clone()); - assert!(consumer.try_consume_broadcast("test").is_some()); + assert!(consumer.get_broadcast("test").is_some()); // On equal hop lengths, each new publish replaces the active and reannounces. consumer.assert_next("test", &consumer1); @@ -1049,7 +1028,7 @@ mod tests { // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(consumer.try_consume_broadcast("test").is_some()); + assert!(consumer.get_broadcast("test").is_some()); consumer.assert_next_wait(); // Drop the active, we should reannounce with the remaining backup. @@ -1058,7 +1037,7 @@ mod tests { // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(consumer.try_consume_broadcast("test").is_some()); + assert!(consumer.get_broadcast("test").is_some()); consumer.assert_next_none("test"); consumer.assert_next("test", &consumer1); @@ -1067,7 +1046,7 @@ mod tests { // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(consumer.try_consume_broadcast("test").is_none()); + assert!(consumer.get_broadcast("test").is_none()); consumer.assert_next_none("test"); consumer.assert_next_wait(); @@ -1083,20 +1062,20 @@ mod tests { origin.publish_broadcast("test", broadcast1.consume()); origin.publish_broadcast("test", broadcast2.consume()); - assert!(origin.try_consume_broadcast("test").is_some()); + assert!(origin.consume().get_broadcast("test").is_some()); // This is harder, dropping the new broadcast first. drop(broadcast2); // Wait for the cleanup async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.try_consume_broadcast("test").is_some()); + assert!(origin.consume().get_broadcast("test").is_some()); drop(broadcast1); // Wait for the cleanup async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.try_consume_broadcast("test").is_none()); + assert!(origin.consume().get_broadcast("test").is_none()); } #[tokio::test] @@ -1110,13 +1089,13 @@ mod tests { origin.publish_broadcast("test", broadcast.consume()); origin.publish_broadcast("test", broadcast.consume()); - assert!(origin.try_consume_broadcast("test").is_some()); + assert!(origin.consume().get_broadcast("test").is_some()); drop(broadcast); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.try_consume_broadcast("test").is_none()); + assert!(origin.consume().get_broadcast("test").is_none()); } // There was a tokio bug where only the first 127 broadcasts would be received instantly. #[tokio::test] @@ -1195,13 +1174,13 @@ mod tests { } #[tokio::test] - async fn test_publish_only_allows() { + async fn test_publish_scope_allows() { let origin = Origin::random().produce(); let broadcast = Broadcast::new().produce(); // Create a producer that can only publish to "allowed" paths let limited_producer = origin - .publish_only(&["allowed/path1".into(), "allowed/path2".into()]) + .scope(&["allowed/path1".into(), "allowed/path2".into()]) .expect("should create limited producer"); // Should be able to publish to allowed paths @@ -1216,15 +1195,15 @@ mod tests { } #[tokio::test] - async fn test_publish_only_empty() { + async fn test_publish_scope_empty() { let origin = Origin::random().produce(); // Creating a producer with no allowed paths should return None - assert!(origin.publish_only(&[]).is_none()); + assert!(origin.scope(&[]).is_none()); } #[tokio::test] - async fn test_consume_only_filters() { + async fn test_consume_scope_filters() { let origin = Origin::random().produce(); let broadcast1 = Broadcast::new().produce(); let broadcast2 = Broadcast::new().produce(); @@ -1239,7 +1218,8 @@ mod tests { // Create a consumer that only sees "allowed" paths let mut limited_consumer = origin - .consume_only(&["allowed".into()]) + .consume() + .scope(&["allowed".into()]) .expect("should create limited consumer"); // Should only receive broadcasts under "allowed" @@ -1254,7 +1234,7 @@ mod tests { } #[tokio::test] - async fn test_consume_only_multiple_prefixes() { + async fn test_consume_scope_multiple_prefixes() { let origin = Origin::random().produce(); let broadcast1 = Broadcast::new().produce(); let broadcast2 = Broadcast::new().produce(); @@ -1266,7 +1246,8 @@ mod tests { // Consumer that only sees "foo" and "bar" paths let mut limited_consumer = origin - .consume_only(&["foo".into(), "bar".into()]) + .consume() + .scope(&["foo".into(), "bar".into()]) .expect("should create limited consumer"); // Order depends on PathPrefixes canonical sort (lexicographic for same length) @@ -1276,7 +1257,7 @@ mod tests { } #[tokio::test] - async fn test_with_root_and_publish_only() { + async fn test_with_root_and_publish_scope() { let origin = Origin::random().produce(); let broadcast = Broadcast::new().produce(); @@ -1285,7 +1266,7 @@ mod tests { // Limit them to publish only to "bar" and "goop/pee" within /foo let limited_producer = foo_producer - .publish_only(&["bar".into(), "goop/pee".into()]) + .scope(&["bar".into(), "goop/pee".into()]) .expect("should create limited producer"); let mut consumer = origin.consume(); @@ -1309,7 +1290,7 @@ mod tests { } #[tokio::test] - async fn test_with_root_and_consume_only() { + async fn test_with_root_and_consume_scope() { let origin = Origin::random().produce(); let broadcast1 = Broadcast::new().produce(); let broadcast2 = Broadcast::new().produce(); @@ -1325,7 +1306,8 @@ mod tests { // Create consumer limited to "bar" and "goop/pee" within /foo let mut limited_consumer = foo_producer - .consume_only(&["bar".into(), "goop/pee".into()]) + .consume() + .scope(&["bar".into(), "goop/pee".into()]) .expect("should create limited consumer"); // Should only see allowed paths (without foo prefix) @@ -1340,7 +1322,7 @@ mod tests { // First limit the producer to specific paths let limited_producer = origin - .publish_only(&["allowed".into()]) + .scope(&["allowed".into()]) .expect("should create limited producer"); // Trying to create a root outside allowed paths should fail @@ -1381,21 +1363,22 @@ mod tests { // Create limited consumer let limited_consumer = origin - .consume_only(&["allowed".into()]) + .consume() + .scope(&["allowed".into()]) .expect("should create limited consumer"); // Should be able to get allowed broadcast - let result = limited_consumer.try_consume_broadcast("allowed/test"); + let result = limited_consumer.get_broadcast("allowed/test"); assert!(result.is_some()); assert!(result.unwrap().is_clone(&broadcast1.consume())); // Should not be able to get disallowed broadcast - assert!(limited_consumer.try_consume_broadcast("notallowed/test").is_none()); + assert!(limited_consumer.get_broadcast("notallowed/test").is_none()); // Original consumer can get both let consumer = origin.consume(); - assert!(consumer.try_consume_broadcast("allowed/test").is_some()); - assert!(consumer.try_consume_broadcast("notallowed/test").is_some()); + assert!(consumer.get_broadcast("allowed/test").is_some()); + assert!(consumer.get_broadcast("notallowed/test").is_some()); } #[tokio::test] @@ -1404,9 +1387,7 @@ mod tests { let broadcast = Broadcast::new().produce(); // Create producer limited to "a/b/c" - let limited_producer = origin - .publish_only(&["a/b/c".into()]) - .expect("should create limited producer"); + let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer"); // Should be able to publish to exact path and nested paths assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume())); @@ -1433,15 +1414,18 @@ mod tests { // Create consumers with different permissions let mut foo_consumer = origin - .consume_only(&["foo".into()]) + .consume() + .scope(&["foo".into()]) .expect("should create foo consumer"); let mut bar_consumer = origin - .consume_only(&["bar".into()]) + .consume() + .scope(&["bar".into()]) .expect("should create bar consumer"); let mut foobar_consumer = origin - .consume_only(&["foo".into(), "bar".into()]) + .consume() + .scope(&["foo".into(), "bar".into()]) .expect("should create foobar consumer"); // Each consumer should only see their allowed paths @@ -1465,16 +1449,17 @@ mod tests { // User with root "demo" allowed to subscribe to "worm-node" and "foobar" let demo_producer = origin.with_root("demo").expect("should create demo root"); let limited_producer = demo_producer - .publish_only(&["worm-node".into(), "foobar".into()]) + .scope(&["worm-node".into(), "foobar".into()]) .expect("should create limited producer"); // Publish some broadcasts assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume())); assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume())); - // consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes + // scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes let mut consumer = limited_producer - .consume_only(&["".into()]) + .consume() + .scope(&["".into()]) .expect("should create consumer with empty prefix"); // Should see both broadcasts (order depends on PathPrefixes sort) @@ -1497,7 +1482,7 @@ mod tests { // User with root "demo" allowed to subscribe to "worm-node" and "foobar" let demo_producer = origin.with_root("demo").expect("should create demo root"); let limited_producer = demo_producer - .publish_only(&["worm-node".into(), "foobar".into()]) + .scope(&["worm-node".into(), "foobar".into()]) .expect("should create limited producer"); // Publish broadcasts at different levels @@ -1505,9 +1490,10 @@ mod tests { assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume())); assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume())); - // Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY + // Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY let mut worm_consumer = limited_producer - .consume_only(&["worm-node".into()]) + .consume() + .scope(&["worm-node".into()]) .expect("should create worm-node consumer"); // Should see worm-node content with paths stripped to "" @@ -1515,9 +1501,10 @@ mod tests { worm_consumer.assert_next("worm-node/foo", &broadcast2.consume()); worm_consumer.assert_next_wait(); // Should NOT see foobar content - // Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo" + // Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo" let mut foo_consumer = limited_producer - .consume_only(&["worm-node/foo".into()]) + .consume() + .scope(&["worm-node/foo".into()]) .expect("should create worm-node/foo consumer"); foo_consumer.assert_next("worm-node/foo", &broadcast2.consume()); @@ -1533,7 +1520,7 @@ mod tests { // Producer with multiple allowed roots let limited_producer = origin - .publish_only(&["app1".into(), "app2".into(), "shared".into()]) + .scope(&["app1".into(), "app2".into(), "shared".into()]) .expect("should create limited producer"); // Publish to each root @@ -1541,9 +1528,10 @@ mod tests { assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume())); assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume())); - // consume_only with empty prefix should maintain all roots + // scope with empty prefix should maintain all roots let mut consumer = limited_producer - .consume_only(&["".into()]) + .consume() + .scope(&["".into()]) .expect("should create consumer with empty prefix"); // Should see all broadcasts from all roots @@ -1554,18 +1542,18 @@ mod tests { } #[tokio::test] - async fn test_publish_only_with_empty_prefix() { + async fn test_publish_scope_with_empty_prefix() { let origin = Origin::random().produce(); let broadcast = Broadcast::new().produce(); // Producer with specific allowed paths let limited_producer = origin - .publish_only(&["services/api".into(), "services/web".into()]) + .scope(&["services/api".into(), "services/web".into()]) .expect("should create limited producer"); - // publish_only with empty prefix should keep the same restrictions + // scope with empty prefix should keep the same restrictions let same_producer = limited_producer - .publish_only(&["".into()]) + .scope(&["".into()]) .expect("should create producer with empty prefix"); // Should still have the same publishing restrictions @@ -1583,9 +1571,7 @@ mod tests { let broadcast3 = Broadcast::new().produce(); // Producer with broad permission - let limited_producer = origin - .publish_only(&["org".into()]) - .expect("should create limited producer"); + let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer"); // Publish at various depths assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume())); @@ -1594,7 +1580,8 @@ mod tests { // Narrow down to team2 only let mut team2_consumer = limited_producer - .consume_only(&["org/team2".into()]) + .consume() + .scope(&["org/team2".into()]) .expect("should create team2 consumer"); team2_consumer.assert_next("org/team2/project1", &broadcast3.consume()); @@ -1602,7 +1589,8 @@ mod tests { // Further narrow down to team1/project1 let mut project1_consumer = limited_producer - .consume_only(&["org/team1/project1".into()]) + .consume() + .scope(&["org/team1/project1".into()]) .expect("should create project1 consumer"); // Should only see project1 content at root @@ -1616,14 +1604,14 @@ mod tests { // Producer with specific allowed paths let limited_producer = origin - .publish_only(&["allowed/path".into()]) + .scope(&["allowed/path".into()]) .expect("should create limited producer"); - // Trying to consume_only with a completely different prefix should return None - assert!(limited_producer.consume_only(&["different/path".into()]).is_none()); + // Trying to scope with a completely different prefix should return None + assert!(limited_producer.consume().scope(&["different/path".into()]).is_none()); - // Similarly for publish_only - assert!(limited_producer.publish_only(&["other/path".into()]).is_none()); + // Similarly for scope + assert!(limited_producer.scope(&["other/path".into()]).is_none()); } // Regression test for https://github.com/moq-dev/moq/issues/910 @@ -1685,17 +1673,18 @@ mod tests { // Setup: user with root "demo" allowed to subscribe to specific paths let demo_producer = origin.with_root("demo").expect("should create demo root"); let user_producer = demo_producer - .publish_only(&["worm-node".into(), "foobar".into()]) + .scope(&["worm-node".into(), "foobar".into()]) .expect("should create user producer"); // Publish some data assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume())); assert!(user_producer.publish_broadcast("foobar", broadcast2.consume())); - // Key test: consume_only with "" should maintain access to allowed roots + // Key test: scope with "" should maintain access to allowed roots let mut consumer = user_producer - .consume_only(&["".into()]) - .expect("consume_only with empty prefix should not fail when user has specific permissions"); + .consume() + .scope(&["".into()]) + .expect("scope with empty prefix should not fail when user has specific permissions"); // Should still receive broadcasts from allowed paths (order not guaranteed) let a1 = consumer.try_announced().expect("expected first announcement"); @@ -1708,7 +1697,8 @@ mod tests { // Also test that we can still narrow the scope let mut narrow_consumer = user_producer - .consume_only(&["worm-node".into()]) + .consume() + .scope(&["worm-node".into()]) .expect("should be able to narrow scope to worm-node"); narrow_consumer.assert_next("worm-node/data", &broadcast1.consume()); @@ -1720,9 +1710,9 @@ mod tests { let origin = Origin::random().produce(); let broadcast = Broadcast::new().produce(); - // publish_only with duplicate prefixes should work (deduped internally) + // scope with duplicate prefixes should work (deduped internally) let producer = origin - .publish_only(&["demo".into(), "demo".into()]) + .scope(&["demo".into(), "demo".into()]) .expect("should create producer"); assert!(producer.publish_broadcast("demo/stream", broadcast.consume())); @@ -1739,7 +1729,7 @@ mod tests { // "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain let producer = origin - .publish_only(&["demo".into(), "demo/foo".into()]) + .scope(&["demo".into(), "demo/foo".into()]) .expect("should create producer"); // Can still publish under "demo/bar" since "demo" covers everything @@ -1757,7 +1747,7 @@ mod tests { // Both "demo" and "demo/foo" are requested — should only have one node let producer = origin - .publish_only(&["demo".into(), "demo/foo".into()]) + .scope(&["demo".into(), "demo/foo".into()]) .expect("should create producer"); assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume())); @@ -1773,7 +1763,7 @@ mod tests { let origin = Origin::random().produce(); let producer = origin - .publish_only(&["demo".into(), "demo/foo".into(), "anon".into()]) + .scope(&["demo".into(), "demo/foo".into(), "anon".into()]) .expect("should create producer"); let allowed: Vec<_> = producer.allowed().collect(); @@ -1873,7 +1863,10 @@ mod tests { #[tokio::test] async fn test_announced_broadcast_disallowed() { let origin = Origin::random().produce(); - let limited = origin.consume_only(&["allowed".into()]).expect("should create limited"); + let limited = origin + .consume() + .scope(&["allowed".into()]) + .expect("should create limited"); // Path is outside allowed prefixes — should return None immediately. assert!(limited.announced_broadcast("notallowed").await.is_none()); @@ -1885,7 +1878,8 @@ mod tests { // limited to `foo/specific` can never resolve. Must return None, not loop forever. let origin = Origin::random().produce(); let limited = origin - .consume_only(&["foo/specific".into()]) + .consume() + .scope(&["foo/specific".into()]) .expect("should create limited"); // now_or_never so we fail fast instead of hanging if the guard regresses. diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index d11b82928..417ac3e3e 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -60,12 +60,12 @@ impl Cluster { /// Returns an [`OriginConsumer`] scoped to this session's subscribe permissions. pub fn subscriber(&self, token: &AuthToken) -> Option { - self.origin.with_root(&token.root)?.consume_only(&token.subscribe) + Some(self.origin.with_root(&token.root)?.scope(&token.subscribe)?.consume()) } /// Returns an [`OriginProducer`] scoped to this session's publish permissions. pub fn publisher(&self, token: &AuthToken) -> Option { - self.origin.with_root(&token.root)?.publish_only(&token.publish) + self.origin.with_root(&token.root)?.scope(&token.publish) } /// Runs the cluster event loop, dialing the configured peers and keeping