feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396
feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396metapox wants to merge 1 commit into
Conversation
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
WalkthroughThis PR adds support for dynamic track priority updates in the MOQ subscriber-publisher system. The 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 193-209: The priorityWatch loop in the priority watcher (the async
IIFE named priorityWatch) must be rewritten to avoid listener leaks and missed
updates: before attaching a listener via
request.track.state.priority.changed(resolve) first check the current priority
snapshot (e.g., read the priority getter/value) so an update that happened
already is handled immediately; if you attach a listener ensure its
dispose/unsubscribe is always called in every path (use try/finally around the
awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 14817bcb-af2f-4924-8e95-029b6bca2793
📒 Files selected for processing (3)
js/lite/src/lite/publisher.tsjs/lite/src/lite/subscriber.tsjs/lite/src/track.ts
| const priorityWatch = (async () => { | ||
| for (;;) { | ||
| let dispose!: () => void; | ||
| const priority = await new Promise<number | undefined>((resolve) => { | ||
| dispose = request.track.state.priority.changed(resolve); | ||
| }); | ||
| if (priority === undefined) { | ||
| dispose(); | ||
| continue; | ||
| } | ||
| const update = new SubscribeUpdate({ priority }); | ||
| await update.encode(stream.writer, this.version); | ||
| console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`); | ||
| } | ||
| })(); | ||
|
|
||
| await Promise.race([stream.reader.closed, request.track.closed, priorityWatch]); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In the latest @moq/signals API, does Signal.changed(callback) remain subscribed until the returned dispose function is called?
💡 Result:
Yes. In the latest @moq/signals API, calling Signal.changed(callback) returns a “dispose” function, and the subscription stays active until that dispose function is called. This is consistent with the library’s standard subscribe pattern: adapters/hooks in the repo use a subscription callback that returns an unsubscribe/dispose function to React’s useSyncExternalStore subscription mechanism (i.e., subscription lifetime is tied to the returned disposer) [1]. The docs site for @moq/signals also emphasizes the explicit subscription mechanism used by effects, rather than implicit global listeners, aligning with disposer-controlled teardown [2] [3].
Citations:
- 1: https://github.com/moq-dev/moq/blob/main/js/signals/src/react.ts
- 2: https://www.jsdelivr.com/package/npm/@moq/signals
- 3: https://doc.moq.dev/js/
🏁 Script executed:
# First, find the file to confirm it exists
find . -name "subscriber.ts" -path "*/lite/*" | head -20Repository: moq-dev/moq
Length of output: 121
🏁 Script executed:
# Examine the file around lines 193-209 to verify the issue
sed -n '180,220p' ./js/lite/src/lite/subscriber.ts | cat -nRepository: moq-dev/moq
Length of output: 1877
🏁 Script executed:
# Check for Signal.changed usage elsewhere in the file to understand the pattern
rg "Signal.changed\|changed\(" ./js/lite/src/lite/subscriber.ts -B 2 -A 2Repository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Also check the StateSignal or Signal type definition to understand its behavior
fd -e ts -e tsx -path "*signals*" | head -10Repository: moq-dev/moq
Length of output: 224
🏁 Script executed:
# Search for Signal or state.priority definitions to understand the changed() behavior
rg "state\.priority|\.changed\(" ./js/lite/src/lite/subscriber.ts -B 1 -A 1Repository: moq-dev/moq
Length of output: 202
🏁 Script executed:
# Find the Signal type definition in the `@moq/signals` library
find . -name "*.ts" -o -name "*.tsx" | xargs grep -l "changed.*\(.*resolve" | head -5Repository: moq-dev/moq
Length of output: 79
🏁 Script executed:
# Check what happens when priorityWatch promise resolves - verify if dispose is called
sed -n '30,35p' ./js/lite/src/lite/subscriber.ts | cat -nRepository: moq-dev/moq
Length of output: 197
🏁 Script executed:
# Search for peek() method usage to understand the pattern for checking current state
rg "\.peek\(" ./js/lite/src/ | head -5Repository: moq-dev/moq
Length of output: 438
🏁 Script executed:
# Verify the exact control flow - check what happens after the Promise.race on line 30
sed -n '190,240p' ./js/lite/src/lite/subscriber.ts | cat -nRepository: moq-dev/moq
Length of output: 2035
🏁 Script executed:
# Search for how updatePriority is called to understand if early updates are possible
rg "updatePriority" ./js/lite/src/lite/ -B 2 -A 2Repository: moq-dev/moq
Length of output: 431
Fix priority watcher lifecycle to prevent listener buildup and dropped updates.
The current implementation has two critical issues:
-
Listener accumulation: On line 197, each loop iteration registers a new listener via
request.track.state.priority.changed(resolve), but disposal only happens on theundefinedpath (line 200). Whenpriority !== undefined, the listener is never disposed, causing accumulation with each update. -
Dropped early updates: If
updatePriority()is called before the listener starts on line 197, that change is missed. The listener only captures changes after registration. -
Resource leak on close: When
Promise.race()resolves from stream or track closure (line 209), thepriorityWatchloop never exits, leaving the final listener active indefinitely.
Proposed fix
// Watch for priority changes and send SUBSCRIBE_UPDATE.
+ const sendPriorityUpdate = async (priority: number) => {
+ const update = new SubscribeUpdate({ priority });
+ await update.encode(stream.writer, this.version);
+ console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`);
+ };
+
+ // Flush any priority set before the watcher starts.
+ const pendingPriority = request.track.state.priority.peek();
+ if (pendingPriority !== undefined && pendingPriority !== request.priority) {
+ await sendPriorityUpdate(pendingPriority);
+ }
+
const priorityWatch = (async () => {
+ const stop = Symbol("stop");
for (;;) {
let dispose!: () => void;
- const priority = await new Promise<number | undefined>((resolve) => {
+ const priority = await Promise.race([
+ new Promise<number | undefined>((resolve) => {
+ dispose = request.track.state.priority.changed(resolve);
+ }),
+ Promise.race([stream.reader.closed, request.track.closed]).then(() => stop),
+ ]);
+ dispose();
+
+ if (priority === stop) break;
+ if (priority === undefined) continue;
+ await sendPriorityUpdate(priority);
- dispose = request.track.state.priority.changed(resolve);
- });
- if (priority === undefined) {
- dispose();
- continue;
- }
- const update = new SubscribeUpdate({ priority });
- await update.encode(stream.writer, this.version);
- console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`;
}
})();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@js/lite/src/lite/subscriber.ts` around lines 193 - 209, The priorityWatch
loop in the priority watcher (the async IIFE named priorityWatch) must be
rewritten to avoid listener leaks and missed updates: before attaching a
listener via request.track.state.priority.changed(resolve) first check the
current priority snapshot (e.g., read the priority getter/value) so an update
that happened already is handled immediately; if you attach a listener ensure
its dispose/unsubscribe is always called in every path (use try/finally around
the awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.
There was a problem hiding this comment.
I don't know if the first issue is real; I think dispose only needs to be called if the callback hasn't been fired yet.
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and examine the Signal.changed() implementation in `@moq/signals`
fd -e ts signal.ts --full-path | head -10
rg -n "changed" js/signals/src/ -A 5 -B 2
Length of output: 4535
@kixelated, you're right — I apologize for the incorrect analysis. Looking at the actual Signal.changed() implementation in js/signals/src/index.ts:
changed(fn: (value: T) => void): Dispose {
this.#changed.add(fn);
return () => this.#changed.delete(fn);
}And the flush logic (lines 109–126):
const changed = this.#changed;
this.#changed = new Set(); // ← set is swapped out before callbacks run
for (const fn of changed) {
fn(value); // callback invoked from the old set
}The #changed set is replaced with a fresh empty Set before any callbacks are invoked. So once the callback fires and resolve is called, the listener has already been evicted from #changed. Calling dispose() afterward does this.#changed.delete(fn) on the new empty set — a no-op. There is no listener accumulation. Your understanding is correct: dispose() only needs to be called if the callback hasn't fired yet (e.g., to cancel a pending listener on early teardown), which is exactly what the try/finally in the current file content handles.
🧬 Code Graph Analysis Results
Snippet 1: priorities update disposal in Subscriber.runSubscribe
-
Filename: js/lite/src/lite/subscriber.ts
-
Approximate location: within the
#runSubscribemethod (priority watch block) -
Contents (complete as in the relevant code block):
const priorityWatch = (async () => {
for (;;) {
let dispose!: () => void;
try {
const priority = await new Promise<number | undefined>((resolve) => {
dispose = request.track.state.priority.changed(resolve);
});
if (priority === undefined) continue;
const update = new SubscribeUpdate({ priority });
await update.encode(stream.writer, this.version);
console.debug(
subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority},
);
} finally {
dispose();
}
}
})(); -
Purpose of snippet:
Demonstrates how the code subscribes to a per-track priority signal, obtains a disposer from the signal, waits for a priority change (or undefined), and then issues a SUBSCRIBE_UPDATE. The disposer is invoked in a finally block to unregister the listener.
Notes for reviewer:
- The user’s comment asks whether dispose should be called only if the callback hasn’t fired yet. This snippet shows the current pattern: dispose() is invoked unconditionally in finally, regardless of whether the priority callback already fired. For clarity, see the accompanying pattern in the same block where the disposer is obtained from request.track.state.priority.changed and then disposed in finally. If the callback has already fired, the disposer’s behavior depends on the implementation of the signal’s disposer (often it is idempotent, but that isn’t guaranteed by all signal implementations).
✏️ Learnings added
Learnt from: kixelated
Repo: moq-dev/moq PR: 1396
File: js/lite/src/lite/subscriber.ts:193-212
Timestamp: 2026-05-15T16:52:56.228Z
Learning: In `js/signals/src/index.ts`, `Signal.changed(fn)` is a **one-time** listener. During flush, `this.#changed` is replaced with a new empty Set before iterating the old set and firing callbacks. This means calling `dispose()` after the callback has already fired is always a no-op — the listener is auto-removed on firing. Listener accumulation is not a concern; `dispose()` is only needed to cancel a still-pending (unfired) listener.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
8f672bf to
83d79b4
Compare
…isher - Track: add priority Signal and updatePriority() method - Subscriber: watch for priority changes after SubscribeOk, send SubscribeUpdate - Publisher: apply received SubscribeUpdate priority to track (was TODO)
83d79b4 to
a08dcb3
Compare
| const priority = await new Promise<number | undefined>((resolve) => { | ||
| dispose = request.track.state.priority.changed(resolve); | ||
| }); |
There was a problem hiding this comment.
Correct, but feels gross. Open to suggestions to improve moq/signals too.
Closes #1363
Problem
The JS subscriber has no API to change priority after subscribing. The only option is to close and re-subscribe, which causes the relay to replay cached groups.
Changes
updatePriority(priority)method that updates an internal SignalSubscribeUpdatewhen it changesSubscribeUpdatepriority to the track (implements existing TODO)Usage
Context
I'm building moq-multicam, a multi-camera streaming system. Camera switching needs instant priority changes without re-subscribe to avoid replaying stale groups from the relay cache.