moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358
moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358kixelated wants to merge 5 commits into
Conversation
Replace the OriginNode/NotifyNode tree, per-publish web_async::spawn cleanup, and per-consumer mpsc fan-out with a flat HashMap behind a parking-lot-style Mutex plus per-consumer queues. Wakers register on both the global state and each tracked broadcast's `poll_closed`, so broadcast closures wake consumers directly — no spawned cleanup tasks and no more `tokio::time::sleep(1ms)` in tests. Renames (with one-line migrations across the workspace): publish_broadcast -> publish create_broadcast -> create consume_broadcast -> dropped (use wait_for_broadcast / try_next) publish_only -> scope consume_only -> scope (on OriginConsumer) announced -> next (returns OriginUpdate enum) try_announced -> try_next announced_broadcast -> wait_for_broadcast Active/Ended semantics, shortest-hop preference, is_clone dedup, and "newer wins ties" all preserved. Active selection still lives in the producer so relay forwarders don't have to reimplement it. Also exposes BroadcastConsumer::poll_closed and is_closed so callers that need to compose close-detection without spawning have a primitive. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (10)
✅ Files skipped from review due to trivial changes (2)
🚧 Files skipped from review as they are similar to previous changes (2)
WalkthroughThis PR refactors the core origin broadcast management system from a per-tree notification architecture to a centralized mutex-guarded state model. The 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
rs/moq-ffi/src/origin.rs (1)
118-123:⚠️ Potential issue | 🟠 Major | ⚡ Quick winThis no longer waits for the exact broadcast path.
with_root(path)subscribes to the whole subtree underpath, andAnnounced::available()returns the firstActiveupdate it sees. That meansannounced_broadcast("foo")can resolve withfoo/barif the nested broadcast arrives first. Please drive this throughOriginConsumer::wait_for_broadcast(path)or store the requested path and filter for an exact match.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-ffi/src/origin.rs` around lines 118 - 123, The function announced_broadcast currently uses self.inner.clone().with_root(path) which subscribes to the whole subtree and returns the first Active update (via Announced::available), allowing nested paths (e.g., "foo/bar") to satisfy a request for "foo". Change announced_broadcast to obtain the exact broadcast origin for the requested path by calling the consumer method that waits for an exact match (OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root, capture and store the requested path and filter Announced::available events until the update.path == requested_path; then construct the MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and return it as before.rs/moq-lite/src/lite/subscriber.rs (1)
223-232:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHandle rejected origin publishes before registering the announce locally.
OriginProducer::publish()can returnfalsehere, but this path still keeps the entry inproducersand spawnsrun_broadcast. If the publish was rejected, the announce becomes locally “active” even though no origin consumer can ever see it, and a later real announce on the same path will trip the duplicate-path check. Roll back the inserted producer and skip spawning whenpublish()returnsfalse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/lite/subscriber.rs` around lines 223 - 232, The code currently inserts the producer and then calls OriginProducer::publish(...); if publish returns false you must roll back that insertion and avoid spawning run_broadcast. Change the flow so after creating `dynamic = broadcast.dynamic()` you call `let published = self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and if `published` is false remove the just-inserted producer from `self.producers` (or undo whatever created the local announce) and do not call `web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn run_broadcast when publish returns true. Ensure the rollback targets the same producer entry created earlier so duplicate-path checks remain correct.rs/moq-lite/src/ietf/subscriber.rs (1)
415-423:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't treat a rejected
publish()as a successful announce.This migration now calls
OriginProducer::publish(...), but the return value is ignored. If the origin rejects the publish,state.broadcastsstill records the path andrun_broadcaststill starts, so later announces on that path can be rejected as duplicates even though nothing was actually published. Please fail or roll back the entry whenpublish()returnsfalse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 415 - 423, The Vacant-branch currently ignores OriginProducer::publish(...)’s boolean result so a rejected publish still inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant handling (the block that creates Broadcast::new().produce(), calls origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast) to check the publish return value and abort/roll back on false: if publish(...) returns false, do not insert into the map (do not create or record BroadcastState in state.broadcasts), do not start run_broadcast for that path, and return an error or a None/appropriate failure value to the caller instead of the broadcast; ensure you reference OriginProducer::publish, BroadcastState, Entry::Vacant and run_broadcast when making the change so the insert and broadcast-start are skipped/undone on rejection.rs/moq-boy/src/input.rs (1)
72-97:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAvoid emitting
ViewerLeftdirectly onOriginUpdate::Ended.This can produce duplicate/spurious offline events during path handoff (
Endedmay be immediately followed byActive), and the spawned command task already emitsViewerLeftwhen the stream actually ends.Suggested adjustment
- let (path, broadcast) = match update { - moq_lite::OriginUpdate::Active(p, b) => (p, Some(b)), - moq_lite::OriginUpdate::Ended(p) => (p, None), - }; + let (path, broadcast) = match update { + moq_lite::OriginUpdate::Active(p, b) => (p, b), + moq_lite::OriginUpdate::Ended(p) => { + tracing::debug!(viewer_id = %p, "viewer broadcast ended"); + continue; + } + }; let viewer_id = path.to_string(); - if let Some(broadcast) = broadcast { + { tracing::info!(%viewer_id, "viewer connected"); let cmd_tx = cmd_tx.clone(); let vid = viewer_id.clone(); tokio::spawn(async move { if let Err(e) = handle_viewer_commands(&vid, broadcast, &cmd_tx).await { tracing::warn!(viewer_id = %vid, error = %e, "viewer command error"); } tracing::info!(viewer_id = %vid, "viewer disconnected"); let _ = cmd_tx.send(Command::ViewerLeft { viewer_id: vid }).await; }); - } else { - tracing::info!(%viewer_id, "viewer went offline"); - let _ = cmd_tx - .send(Command::ViewerLeft { - viewer_id: viewer_id.clone(), - }) - .await; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-boy/src/input.rs` around lines 72 - 97, Don't send Command::ViewerLeft when matching moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the spawned task from the Active case already sends ViewerLeft when the stream closes. In the match/if block around OriginUpdate::Active/Ended, keep the existing tokio::spawn + handle_viewer_commands flow for Active (including the send in the spawned task), but for the Ended branch remove the cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log (tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft is only emitted by the spawned task handling the broadcast and prevents spurious duplicate events.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 113-126: The current scoped-consumer usage can return a descendant
(e.g., "foo/bar") first and incorrectly fail; after creating the consumer via
origin.consume().scope(&[path.as_path()]) you must drain the consumer (loop over
consumer.try_next()) until you find an OriginUpdate::Active where p.as_path() ==
path and then return Ok(b), or until try_next() returns None in which case
return Err(Error::BroadcastNotFound); keep the same origin.consume(),
scope(&[path.as_path()]), and matching on moq_lite::OriginUpdate::Active(p, b)
but iterate instead of a single try_next() call.
- Around line 135-137: The current wrapper in origin.rs calls
self.active.get_mut(origin) and then origin.publish(path, broadcast) but always
returns Ok(()), which masks rejections signaled by OriginProducer::publish()
returning false; update the code to check the boolean result of
OriginProducer::publish(path, broadcast) and propagate failure by returning an
Err variant (e.g., Err(Error::PublishRejected) or an appropriate existing Error)
when publish returns false, otherwise return Ok(()). Ensure you reference the
same origin variable from self.active.get_mut(origin) and preserve existing
error handling for get_mut (ok_or(Error::OriginNotFound)?).
---
Outside diff comments:
In `@rs/moq-boy/src/input.rs`:
- Around line 72-97: Don't send Command::ViewerLeft when matching
moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the
spawned task from the Active case already sends ViewerLeft when the stream
closes. In the match/if block around OriginUpdate::Active/Ended, keep the
existing tokio::spawn + handle_viewer_commands flow for Active (including the
send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.
In `@rs/moq-ffi/src/origin.rs`:
- Around line 118-123: The function announced_broadcast currently uses
self.inner.clone().with_root(path) which subscribes to the whole subtree and
returns the first Active update (via Announced::available), allowing nested
paths (e.g., "foo/bar") to satisfy a request for "foo". Change
announced_broadcast to obtain the exact broadcast origin for the requested path
by calling the consumer method that waits for an exact match
(OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root,
capture and store the requested path and filter Announced::available events
until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 415-423: The Vacant-branch currently ignores
OriginProducer::publish(...)’s boolean result so a rejected publish still
inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant
handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 223-232: The code currently inserts the producer and then calls
OriginProducer::publish(...); if publish returns false you must roll back that
insertion and avoid spawning run_broadcast. Change the flow so after creating
`dynamic = broadcast.dynamic()` you call `let published =
self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and
if `published` is false remove the just-inserted producer from `self.producers`
(or undo whatever created the local announce) and do not call
`web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn
run_broadcast when publish returns true. Ensure the rollback targets the same
producer entry created earlier so duplicate-path checks remain correct.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3a30bf84-46cf-4da8-a44d-256ee6e5983f
📒 Files selected for processing (23)
rs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/libmoq/src/origin.rsrs/moq-boy/src/input.rsrs/moq-boy/src/main.rsrs/moq-cli/src/client.rsrs/moq-cli/src/server.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/origin.rsrs/moq-gst/src/sink/imp.rsrs/moq-gst/src/source/imp.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/origin.rsrs/moq-native/examples/chat.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/connection.rsrs/moq-relay/src/web.rs
# Conflicts: # rs/libmoq/src/origin.rs # rs/moq-clock/src/main.rs # rs/moq-lite/src/ietf/publisher.rs # rs/moq-lite/src/lite/publisher.rs # rs/moq-lite/src/model/broadcast.rs # rs/moq-lite/src/model/origin.rs # rs/moq-relay/src/cluster.rs # rs/moq-relay/src/web.rs
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rs/moq-relay/src/web.rs (1)
465-474:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSnapshot can include ended or duplicate paths.
The drain only inspects
Activeupdates, so if the queue contains any of:
Active(p, _)followed byEnded(p)—pis reported as currently announced even though it just ended;Active(p, b1)followed byEnded(p)andActive(p, b2)(the documented sequence when a path is replaced) —pappears twice in the response.Per
OriginUpdate's docs (rs/moq-lite/src/model/origin.rs:214-231),Activefor a replacement is preceded byEndedfor the same path, so duplicates are reachable in normal operation, not just edge cases.Tracking both variants in a small set keeps the HTTP
/announcedsnapshot consistent without changing its semantics.🛠️ Suggested fix
- let mut broadcasts = Vec::new(); - - while let Some(update) = origin.try_next() { - if let moq_lite::OriginUpdate::Active(suffix, _) = update { - broadcasts.push(suffix); - } - } - - Ok(broadcasts.iter().map(|p| p.to_string()).collect::<Vec<_>>().join("\n")) + let mut broadcasts = std::collections::BTreeSet::new(); + + while let Some(update) = origin.try_next() { + match update { + moq_lite::OriginUpdate::Active(suffix, _) => { + broadcasts.insert(suffix); + } + moq_lite::OriginUpdate::Ended(suffix) => { + broadcasts.remove(&suffix); + } + } + } + + Ok(broadcasts.iter().map(|p| p.to_string()).collect::<Vec<_>>().join("\n"))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-relay/src/web.rs` around lines 465 - 474, The current drain loop only pushes OriginUpdate::Active suffixes into broadcasts, allowing ended or duplicate paths to appear; change the logic to maintain a small set (e.g., HashSet<String>) instead of Vec: when origin.try_next() yields OriginUpdate::Active(suffix, _) insert suffix.into() into the set, and when it yields OriginUpdate::Ended(suffix) remove that suffix from the set; after the loop, collect the set into a Vec (optionally sort for stable output) and join with "\n" instead of using broadcasts.iter(), ensuring the snapshot reflects currently announced (non-ended, non-duplicated) paths — update references around origin.try_next(), OriginUpdate::Active, OriginUpdate::Ended, and the broadcasts variable accordingly.rs/moq-lite/src/ietf/publisher.rs (1)
446-528:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftSynchronous PublishNamespace handshake serializes the entire announce loop.
Each
OriginUpdate::Activeopens a bidi stream and inline-awaits the peer's response (stream.reader.decode().await?/read_exact(...).await?on lines 479–481). Until that single response arrives, no other update — including subsequentActives and anyEnded— can be drained fromself.origin.next(). A slow or unresponsive peer therefore stalls all announce traffic on this session, and anEndedfor a never-acked namespace can sit in the queue indefinitely. Any decoding error or unexpected message (line 497, 501) also fails the entire session.Consider spawning a per-namespace task that owns the stream + handshake + cleanup, and have
run_announcedispatch updates to it (e.g., viaweb_async::spawnand a small registry keyed byPathOwned). This would decouple peer responsiveness from the update draining loop and improve overall liveness under slow/adversarial peers.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/ietf/publisher.rs` around lines 446 - 528, The announce loop in run_announce blocks on the synchronous per-namespace handshake (calls like stream.reader.decode().await and read_exact(...) in the OriginUpdate::Active branch), which serializes the entire loop and lets any error abort the session; refactor so run_announce only dispatches updates and does not await the peer response: spawn a per-namespace task (e.g., via web_async::spawn) that takes ownership of the Stream, performs the PublishNamespace handshake and local decoding (previously done around stream.reader.decode()/read_exact()), inserts itself into a small registry keyed by PathOwned (previously namespace_streams), and handles cleanup on OriginUpdate::Ended by receiving a signal or lookup+close; ensure the per-namespace task catches and logs decoding/IO errors instead of returning Err from run_announce so a bad/slow peer won’t stall or kill the whole session.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-boy/src/main.rs`:
- Line 222: The call to publish_origin.publish(&broadcast_path,
broadcast.consume()) currently ignores its boolean return; change it to assert
success using anyhow::ensure! so rejected publishes produce an error like
elsewhere (e.g., rs/moq-gst/src/sink/imp.rs). Specifically, after calling
publish_origin.publish(&broadcast_path, broadcast.consume()), use
anyhow::ensure!(result, "failed to publish broadcast for path {}: rejected by
OriginProducer", broadcast_path) (or equivalent) so a rejected publish returns
an error instead of silently proceeding; reference the publish_origin,
broadcast_path, and broadcast symbols when locating the code to modify.
In `@rs/moq-cli/src/client.rs`:
- Line 9: The call to origin.publish(&name, publish.consume()) currently ignores
the bool return that signals rejection; update the code to check the returned
bool and fail fast on rejection (e.g., use anyhow::ensure!(origin.publish(&name,
publish.consume()), "failed to publish {}: rejected by origin", name)) so that a
rejected publish does not let reconnect.closed() continue with no broadcast
registered; mirror the ensure pattern used around publish handling in the other
module to propagate the error instead of discarding the result.
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 202-230: The Active branch may skip sending Announce::Active when
hops.push(self_origin).is_err(), but the Ended branch still sends
Announce::Ended and causes the subscriber to error; fix by tracking suppressed
paths: create a HashSet (e.g., suppressed_paths) in the same scope as the
announce loop, and when hops.push(self_origin).is_err() insert the full path (or
the same key used by subscriber) into suppressed_paths and continue; in the
OriginUpdate::Ended handler, check suppressed_paths.remove(&path) first and if
it was present, skip encoding/sending the Announce::Ended, otherwise proceed to
encode/send as before (ensuring you use the same path form used for
suppression/lookup).
---
Outside diff comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 446-528: The announce loop in run_announce blocks on the
synchronous per-namespace handshake (calls like stream.reader.decode().await and
read_exact(...) in the OriginUpdate::Active branch), which serializes the entire
loop and lets any error abort the session; refactor so run_announce only
dispatches updates and does not await the peer response: spawn a per-namespace
task (e.g., via web_async::spawn) that takes ownership of the Stream, performs
the PublishNamespace handshake and local decoding (previously done around
stream.reader.decode()/read_exact()), inserts itself into a small registry keyed
by PathOwned (previously namespace_streams), and handles cleanup on
OriginUpdate::Ended by receiving a signal or lookup+close; ensure the
per-namespace task catches and logs decoding/IO errors instead of returning Err
from run_announce so a bad/slow peer won’t stall or kill the whole session.
In `@rs/moq-relay/src/web.rs`:
- Around line 465-474: The current drain loop only pushes OriginUpdate::Active
suffixes into broadcasts, allowing ended or duplicate paths to appear; change
the logic to maintain a small set (e.g., HashSet<String>) instead of Vec: when
origin.try_next() yields OriginUpdate::Active(suffix, _) insert suffix.into()
into the set, and when it yields OriginUpdate::Ended(suffix) remove that suffix
from the set; after the loop, collect the set into a Vec (optionally sort for
stable output) and join with "\n" instead of using broadcasts.iter(), ensuring
the snapshot reflects currently announced (non-ended, non-duplicated) paths —
update references around origin.try_next(), OriginUpdate::Active,
OriginUpdate::Ended, and the broadcasts variable accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fe8ca23a-ae99-432f-8389-e2f3f2f976e0
📒 Files selected for processing (14)
rs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/libmoq/src/error.rsrs/libmoq/src/origin.rsrs/moq-boy/src/main.rsrs/moq-cli/src/client.rsrs/moq-cli/src/main.rsrs/moq-gst/src/sink/imp.rsrs/moq-gst/src/source/imp.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-relay/src/web.rs
… etc.) Reverts the method/type renames the rewrite introduced and restores the names already in use on main, since those are clearer and avoid churn at every existing call site. | PR rewrite name | Restored to main name | | ---------------------- | ---------------------- | | publish | publish_broadcast | | create | create_broadcast | | next (async) | announced | | try_next | try_announced | | wait_for_broadcast | announced_broadcast | | OriginUpdate | OriginAnnounce | The behavior (poll-driven conducer model, Active/Ended enum, is_closed and poll_closed primitives) is unchanged.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
rs/moq-lite/src/lite/publisher.rs (1)
204-230:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMAX_HOPS suppression still asymmetric — Active dropped but Ended still emitted.
When
hops.push(self_origin).is_err()at line 211, the Active is suppressed (line 216continue). However, the matchingOriginAnnounce::Ended(path)arm (lines 221-230) unconditionally encodesAnnounce::Ended. The peer inlite/subscriber.rsdoesproducers.remove(&path).ok_or(Error::NotFound)?for an Ended it has no record of, which propagatesError::NotFoundand aborts the announce stream.Track suppressed paths so the corresponding Ended is also suppressed (see prior review for sketch).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/lite/publisher.rs` around lines 204 - 230, When hops.push(self_origin).is_err() suppresses an Active announce in the OriginAnnounce handling, also record that suffix/path in a local suppressed set so the matching OriginAnnounce::Ended handling checks and skips encoding lite::Announce::Ended for those suppressed paths; specifically, add a HashSet (e.g., suppressed_paths) in the publisher scope, insert the suffix/path when the Active branch hits the hops.push(...) error, and in the OriginAnnounce::Ended branch check suppressed_paths first — if present, remove it and continue (skip encoding), otherwise proceed to create and encode lite::Announce::Ended as before; this keeps behavior symmetric with subscriber logic (which expects Ended only for known producers removed via producers.remove).
🧹 Nitpick comments (3)
rs/moq-lite/src/model/origin.rs (2)
387-401: 💤 Low valuePer-consumer pending queues are unbounded.
distributedoes an unboundedpush_backper consumer with no high-water mark. This is an intentional trade-off vs the previous mpsc design (per the PR description, avoids the tokio 127-message bug), and announcement events are typically low-rate, so this is fine in practice. Worth a brief comment nearConsumerQueuedocumenting the choice so future contributors don't reintroduce a bound that breaks the no-blocking guarantee forpublish.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/model/origin.rs` around lines 387 - 401, The per-consumer pending queues are intentionally unbounded to avoid blocking/pushing back in publish (see distribute) — add a concise comment on the ConsumerQueue type (and/or its pending field) stating that pending is intentionally unbounded to preserve the no-blocking guarantee for publish, explaining the trade-off vs bounded channels (mentioning the tokio 127-message mpsc issue) and that this is deliberate to avoid reintroducing a capacity that could block; reference ConsumerQueue and distribute in the comment so future contributors understand the rationale.
688-743: 💤 Low valuePhase 4/5 waiter registration may accumulate across iterations.
When
close_detectedtriggers acontinueback to Phase 1, thewaiterwas already registered both globally (Phase 4) and on one or morepoll_closedslots (Phase 5). Re-registration in the next iteration will append duplicate entries rather than replace them, sinceconducer::Waiter::registerappends newWeak<Waker>slots instead of deduplicating by waker identity. This results in redundant wake-ups on the same waiter—still correct, but wasteful of space in theWaiterList.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/model/origin.rs` around lines 688 - 743, The waiter can get registered twice across loops because Phase 4 registers globally before Phase 5 checks per-active closures; move the global registration so it happens after the loop that calls entry.active.poll_closed (i.e., perform the per-active poll_closed checks first, and only call waiter.register(&mut state.waiters) if no close_detected), ensuring poll_announced doesn't append duplicate waiter entries when it continues the loop; adjust the code in poll_announced around the waiter.register(&mut state.waiters) and the for (path, entry) .. entry.active.poll_closed(...) section accordingly.rs/moq-native/tests/broadcast.rs (1)
81-91: 💤 Low valueLGTM — consistent enum-match migration across all three transports.
The
Activebranch verifies the announced path and yieldsbc; theEndedarm is a fail-fast assertion, which is appropriate for a smoke test that publishes once and expects exactly one Active update.Minor nit (skip if not interesting): the same 7-line match is now copy-pasted three times. Extracting a small helper like
await_active(announcements, "test")would centralize the assertion message and keep future API drift to one site. Not worth blocking on.Also applies to: 425-435, 536-546
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-native/tests/broadcast.rs` around lines 81 - 91, Extract the repeated 7-line match into a small helper (e.g., await_active) that takes the announcements stream/handle and an expected path string (and optionally TIMEOUT) and performs the tokio::time::timeout(...).await.expect(...).expect(...), matches on moq_lite::OriginAnnounce::{Active(path, bc) | Ended(path)}, asserts path == expected and returns bc; then replace the three inline match blocks with calls to await_active(announcements, "test") (or the appropriate expected path) to centralize the assertion and reduce duplication.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 112-130: Update the workspace MSRV to at least Rust 1.88 in the
root Cargo.toml so the let-chain syntax used in the consume function compiles
under the declared toolchain, and modify the path handling in
rs/libmoq/src/origin.rs: after calling path.as_path() to obtain a Path<'_> (the
local variable named path), remove the redundant path.as_path() call and pass
the path directly to origin.consume().scope (e.g. use &[path] or &[path.clone()]
as needed to satisfy borrowing/lifetime requirements) so scope receives a slice
of Path without the extra conversion.
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 511-523: The match on Version should treat the newest-draft
behavior as the default so future drafts inherit it: change the arm that
explicitly lists Version::Draft17 to a wildcard default and keep the older
drafts explicitly listed; i.e., have Version::Draft14 | Version::Draft15 |
Version::Draft16 => send the ietf::PublishNamespaceDone (using
stream.writer.encode_message with suffix.as_path() and request_id), and replace
Version::Draft17 => {} with _ => {} so Draft17+ default to the no-op behavior.
---
Duplicate comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 204-230: When hops.push(self_origin).is_err() suppresses an Active
announce in the OriginAnnounce handling, also record that suffix/path in a local
suppressed set so the matching OriginAnnounce::Ended handling checks and skips
encoding lite::Announce::Ended for those suppressed paths; specifically, add a
HashSet (e.g., suppressed_paths) in the publisher scope, insert the suffix/path
when the Active branch hits the hops.push(...) error, and in the
OriginAnnounce::Ended branch check suppressed_paths first — if present, remove
it and continue (skip encoding), otherwise proceed to create and encode
lite::Announce::Ended as before; this keeps behavior symmetric with subscriber
logic (which expects Ended only for known producers removed via
producers.remove).
---
Nitpick comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 387-401: The per-consumer pending queues are intentionally
unbounded to avoid blocking/pushing back in publish (see distribute) — add a
concise comment on the ConsumerQueue type (and/or its pending field) stating
that pending is intentionally unbounded to preserve the no-blocking guarantee
for publish, explaining the trade-off vs bounded channels (mentioning the tokio
127-message mpsc issue) and that this is deliberate to avoid reintroducing a
capacity that could block; reference ConsumerQueue and distribute in the comment
so future contributors understand the rationale.
- Around line 688-743: The waiter can get registered twice across loops because
Phase 4 registers globally before Phase 5 checks per-active closures; move the
global registration so it happens after the loop that calls
entry.active.poll_closed (i.e., perform the per-active poll_closed checks first,
and only call waiter.register(&mut state.waiters) if no close_detected),
ensuring poll_announced doesn't append duplicate waiter entries when it
continues the loop; adjust the code in poll_announced around the
waiter.register(&mut state.waiters) and the for (path, entry) ..
entry.active.poll_closed(...) section accordingly.
In `@rs/moq-native/tests/broadcast.rs`:
- Around line 81-91: Extract the repeated 7-line match into a small helper
(e.g., await_active) that takes the announcements stream/handle and an expected
path string (and optionally TIMEOUT) and performs the
tokio::time::timeout(...).await.expect(...).expect(...), matches on
moq_lite::OriginAnnounce::{Active(path, bc) | Ended(path)}, asserts path ==
expected and returns bc; then replace the three inline match blocks with calls
to await_active(announcements, "test") (or the appropriate expected path) to
centralize the assertion and reduce duplication.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d989bb11-ca4c-490f-a6bb-f12532ea0511
📒 Files selected for processing (12)
rs/hang/examples/subscribe.rsrs/libmoq/src/origin.rsrs/moq-boy/src/input.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/origin.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/origin.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/web.rs
| pub fn consume<P: moq_lite::AsPath>(&mut self, origin: Id, path: P) -> Result<moq_lite::BroadcastConsumer, Error> { | ||
| use moq_lite::AsPath; | ||
| let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; | ||
| // TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait | ||
| // for gossip instead of racing it. | ||
| // Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume(). | ||
| #[allow(deprecated)] | ||
| origin.get_broadcast(path).ok_or(Error::BroadcastNotFound) | ||
| // TODO: expose an async variant so FFI callers can wait for gossip instead of racing it. | ||
| // Scope is prefix-based, so drain the replay queue until we find an exact path match. | ||
| let path = path.as_path(); | ||
| let mut consumer = origin | ||
| .consume() | ||
| .scope(&[path.as_path()]) | ||
| .ok_or(Error::BroadcastNotFound)?; | ||
| while let Some(update) = consumer.try_announced() { | ||
| if let moq_lite::OriginAnnounce::Active(p, b) = update | ||
| && p.as_path() == path | ||
| { | ||
| return Ok(b); | ||
| } | ||
| } | ||
| Err(Error::BroadcastNotFound) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm every crate using let-chains is on edition 2024 and that workspace MSRV is >= 1.88.
fd -t f -e toml | xargs rg -nP '^\s*(edition|rust-version)\s*='Repository: moq-dev/moq
Length of output: 763
🏁 Script executed:
#!/bin/bash
# Check if moq-gst depends on libmoq
grep -E "libmoq|moq_lite" rs/moq-gst/Cargo.toml
# Also get details on the moq_lite::AsPath trait to verify the redundancy claim
fd -t f -name "*.rs" -path "*/moq_lite/*" | xargs rg -l "trait AsPath" | head -1 | xargs cat -nRepository: moq-dev/moq
Length of output: 34780
🏁 Script executed:
#!/bin/bash
# Check moq-gst Cargo.toml for dependencies on libmoq or moq_lite
cat rs/moq-gst/Cargo.toml | grep -E "libmoq|moq.lite"Repository: moq-dev/moq
Length of output: 87
Update workspace MSRV from 1.85 to 1.88 or higher to support the let-chain syntax in lines 123–125.
Let chains were stabilized in Rust 1.88 and require edition 2024. This code is on the correct edition, but the workspace declares rust-version = "1.85" in the root Cargo.toml, which is incompatible. Any attempt to build with Rust 1.85–1.87 will fail with a let-chain feature error. Update the workspace MSRV to at least 1.88.
Additionally, path.as_path() on line 120 is redundant — path is already a Path<'_> after line 117, so use &[path] instead (or &[path.clone()] if needed).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/libmoq/src/origin.rs` around lines 112 - 130, Update the workspace MSRV to
at least Rust 1.88 in the root Cargo.toml so the let-chain syntax used in the
consume function compiles under the declared toolchain, and modify the path
handling in rs/libmoq/src/origin.rs: after calling path.as_path() to obtain a
Path<'_> (the local variable named path), remove the redundant path.as_path()
call and pass the path directly to origin.consume().scope (e.g. use &[path] or
&[path.clone()] as needed to satisfy borrowing/lifetime requirements) so scope
receives a slice of Path without the extra conversion.
- lite/publisher.rs: when an Active is dropped because the hop chain hits
MAX_HOPS, also drop the matching Ended. The subscriber treats Ended for
an unknown path as Error::NotFound, so the asymmetric pair was a real
protocol error.
- ietf/publisher.rs: replace `Version::Draft17 => {}` arms with `_ => {}`
so future drafts inherit v17+ behavior by default (per CLAUDE.md).
- libmoq/origin.rs: split the let-chain into nested if so the code stays
within the workspace MSRV (1.85; let-chains require 1.88).
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-lite/src/ietf/publisher.rs (1)
597-610:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winInitial drain silently drops
Ended, leaking stale namespaces to the peer.
try_announced()returns updates in queue order, so a path that went Active→Ended before this drain runs will yield both events back-to-back. Theif lethere handles onlyActive, so the code emitsNamespace(A)and then dropsEnded(A)on the floor. Since the originalEnded(A)was already consumed from the consumer queue, the streaming loop below will never re-emit it, and the peer is left with a permanently stale namespace until some other event happens to A.The streaming loop (lines 619-630) handles both variants symmetrically; the init drain should too.
🔧 Suggested fix
// Send initial NAMESPACE messages for currently active namespaces while let Some(update) = origin.try_announced() { - if let crate::OriginAnnounce::Active(path, _) = update { - let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); - tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); - stream.writer.encode(&ietf::Namespace::ID).await?; - stream - .writer - .encode(&ietf::Namespace { - suffix: suffix.to_owned(), - }) - .await?; - } + match update { + crate::OriginAnnounce::Active(path, _) => { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); + tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); + stream.writer.encode(&ietf::Namespace::ID).await?; + stream.writer.encode(&ietf::Namespace { suffix }).await?; + } + crate::OriginAnnounce::Ended(path) => { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); + tracing::debug!(broadcast = %origin.absolute(&path), "namespace_done"); + stream.writer.encode(&ietf::NamespaceDone::ID).await?; + stream.writer.encode(&ietf::NamespaceDone { suffix }).await?; + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/ietf/publisher.rs` around lines 597 - 610, The initial drain currently only handles OriginAnnounce::Active and silently drops OriginAnnounce::Ended, leaking stale namespaces; update the while let loop over origin.try_announced() to match both OriginAnnounce::Active(path, _) and OriginAnnounce::Ended(path, _) and for the Ended case emit the same pair of messages (stream.writer.encode(&ietf::Namespace::ID) and stream.writer.encode(&ietf::Namespace { suffix: suffix.to_owned() }).await?) as the Active branch so the peer receives the Ended notification just like the streaming loop does; use the same suffix extraction via path.strip_prefix(&prefix).expect(...) and the same tracing/logging symbol (origin.absolute(&path)) to keep behavior consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Line 82: The insert into state.origin.announced (announced_id =
state.origin.announced.insert((path.to_string(), active))?) creates unbounded
growth because entries are never removed; fix by ensuring every announced_id is
removed or evicted after being consumed—either call
state.origin.announced.remove(announced_id) (or equivalent) in the consumer path
where the announcement is processed, or replace the backing storage with a
bounded/evicting structure (e.g., an LRU or time-windowed cache) and add pruning
logic; locate uses of announced_id and the consumer functions that read
announcements and add a removal/prune step to reclaim entries.
---
Outside diff comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 597-610: The initial drain currently only handles
OriginAnnounce::Active and silently drops OriginAnnounce::Ended, leaking stale
namespaces; update the while let loop over origin.try_announced() to match both
OriginAnnounce::Active(path, _) and OriginAnnounce::Ended(path, _) and for the
Ended case emit the same pair of messages
(stream.writer.encode(&ietf::Namespace::ID) and
stream.writer.encode(&ietf::Namespace { suffix: suffix.to_owned() }).await?) as
the Active branch so the peer receives the Ended notification just like the
streaming loop does; use the same suffix extraction via
path.strip_prefix(&prefix).expect(...) and the same tracing/logging symbol
(origin.absolute(&path)) to keep behavior consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c364712f-96f3-40b8-bff2-c9ee4388048e
📒 Files selected for processing (3)
rs/libmoq/src/origin.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/lite/publisher.rs
| let callback = entry.callback; | ||
|
|
||
| let announced_id = state.origin.announced.insert((path.to_string(), broadcast.is_some()))?; | ||
| let announced_id = state.origin.announced.insert((path.to_string(), active))?; |
There was a problem hiding this comment.
Unbounded growth risk in origin.announced storage.
Each announce update inserts a new (String, bool) entry, but this module exposes no corresponding removal path for consumed announcement IDs. On long-lived/high-churn origins, this can grow indefinitely and eventually exhaust memory/IDs.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/libmoq/src/origin.rs` at line 82, The insert into state.origin.announced
(announced_id = state.origin.announced.insert((path.to_string(), active))?)
creates unbounded growth because entries are never removed; fix by ensuring
every announced_id is removed or evicted after being consumed—either call
state.origin.announced.remove(announced_id) (or equivalent) in the consumer path
where the announcement is processed, or replace the backing storage with a
bounded/evicting structure (e.g., an LRU or time-windowed cache) and add pruning
logic; locate uses of announced_id and the consumer functions that read
announcements and add a removal/prune step to reclaim entries.
|
|
||
| let broadcast = broadcast.ok_or_else(|| anyhow::anyhow!("broadcast unannounced: {path}"))?; | ||
| let (path, broadcast) = loop { | ||
| match consumer.next().await { |
There was a problem hiding this comment.
.context("origin closed")
| // NOTE: The path is empty because we're using the URL to scope the broadcast. | ||
| // OPTIONAL: We publish after inserting the tracks just to avoid a nearly impossible race condition. | ||
| origin.publish_broadcast("", broadcast.consume()); | ||
| origin.publish("", broadcast.consume()); |
There was a problem hiding this comment.
I think go back to publish_broadcast, at least to reduce the number of breaking changes.
| while let Some((path, broadcast)) = consumer.announced().await { | ||
| while let Some(update) = consumer.next().await { | ||
| let (path, active) = match update { | ||
| moq_lite::OriginUpdate::Active(p, _) => (p, true), | ||
| moq_lite::OriginUpdate::Ended(p) => (p, false), | ||
| }; |
There was a problem hiding this comment.
I don't know if I like this new API. It seems like the same thing but with extra steps?
Also announced() is better than next(). IMO revert it to reduce the number of breaking changes.
|
|
||
| let broadcast = broadcast.ok_or_else(|| anyhow::anyhow!("broadcast unannounced: {path}"))?; | ||
| let (path, broadcast) = loop { | ||
| match consumer.announced().await { |
There was a problem hiding this comment.
I think this API is worse?
Resolves merge conflict in ietf/publisher.rs from main and reverts `OriginAnnounce` from the new enum (`Active(path, broadcast)` / `Ended(path)`) back to the main-compatible tuple type alias `(PathOwned, Option<BroadcastConsumer>)`, where `Some` means active and `None` means ended. This reduces breaking changes vs main and removes the extra loop the enum forced on subscribers (per review feedback). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Replaces the
OriginNode/NotifyNodetree, per-publishweb_async::spawncleanup, and per-consumermpscfan-out with a flatHashMap<PathOwned, Entry>behind aMutexplus per-consumer queues. Consumers register a singleconducer::Waiteron both the shared state and each tracked broadcast'spoll_closed, so a broadcast closing wakes its consumers directly: no spawned cleanup tasks, notokio::time::sleep(1ms)in tests, no 127-message tokio-mpsc bug.Active/Ended semantics, shortest-hop preference,
is_clonededup, and "newer wins ties" are all preserved. Active selection stays in the producer so relay forwarders don't have to reimplement the bookkeeping.OriginAnnounceis now an enum (Active(path, broadcast)/Ended(path)) instead of a tuple, but the existing method names on main (publish_broadcast,create_broadcast,announced,try_announced,announced_broadcast,scope,with_root) are preserved so call sites only had to change at the match.Also exposes
BroadcastConsumer::poll_closedandis_closedso callers that need to compose close-detection without spawning have a primitive.Test plan
cargo test -p moq-lite --lib model::origin— origin tests passcargo test --workspace— all unit + integration tests passcargo build --workspacejust check🤖 Generated with Claude Code