Skip to content

Fix OrderedConsumer... for good?#1054

Merged
kixelated merged 10 commits intomainfrom
kixelated/consume-from-zero
Mar 5, 2026
Merged

Fix OrderedConsumer... for good?#1054
kixelated merged 10 commits intomainfrom
kixelated/consume-from-zero

Conversation

@kixelated
Copy link
Collaborator

@kixelated kixelated commented Mar 5, 2026

No description provided.

kixelated and others added 6 commits March 4, 2026 11:57
Replace the fragile tokio::select! + FuturesUnordered machinery in
OrderedConsumer with a single poll loop driven by conducer::Waiter.
A single waiter registers on all relevant channels, eliminating the
need for buffer_until's infinite blocking and the associated edge cases.

- Add poll_next_group, poll_next_frame, poll_read_all to moq-lite consumers
- Replace futures dep with conducer in hang
- Rewrite GroupReader with two-phase frame reading and lazy timestamp discovery
- Latency skip compares pending timestamps against max_timestamp + latency

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
TrackProducer::consume() previously started at the latest group, but
this policy belongs in publishers, not the consumer itself. Starting
at index 0 prepares for future DeliveryTimeout support where skip
decisions depend on when groups were received.

- Change consume() to start at index 0 in both TrackProducer and TrackWeak
- Add Waiter::noop() to conducer for non-blocking poll draining
- Publishers (lite + ietf) now skip to latest group on subscribe
- Add comprehensive tests for frame.rs (9 tests) and group.rs (8 tests)
- Add OrderedConsumer tests for startup behavior and edge cases (8 tests)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 5, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Polling APIs in conducer were changed from Option-based to Result-based, updating consumer/producer poll signatures to Poll<Result<..., Ref<'_, T>>> and adapting wait paths. Waiter::noop() was added. rs/hang now depends on the workspace conducer, replaces GroupReader with GroupBuffer, and adopts poll-based buffering and startup logic. moq-lite refactored frames, groups, tracks, publishers/subscribers, and broadcasts to poll-based, non-blocking helpers and adjusted async signatures. Conducer package version bumped to 0.3.0.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning No pull request description was provided by the author, making it impossible to assess whether the description relates to the changeset. Provide a detailed pull request description explaining the motivation, changes made, and any breaking changes or migration notes for consumers of the affected APIs.
Title check ❓ Inconclusive The title 'Fix OrderedConsumer... for good?' is vague and uses ellipsis notation, making it unclear what specific issue is being fixed or what the scope of changes entails beyond OrderedConsumer. Replace with a more specific title that clearly describes the main change, such as 'Refactor OrderedConsumer to use poll-based buffering with conductor integration' or similar.
✅ Passed checks (1 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch kixelated/consume-from-zero

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/conducer/src/producer.rs (1)

68-73: ⚠️ Potential issue | 🟡 Minor

Update poll docs to match the new return contract.

Line 72 still references None on close, but poll now returns Poll::Ready(Err(Ref)) in that case.

📝 Doc fix
-	/// Returns `None` if the channel is closed.
+	/// Returns `Poll::Ready(Err(Ref))` if the channel is closed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/conducer/src/producer.rs` around lines 68 - 73, The doc comment for pub fn
poll<F, R>(&self, waiter: &Waiter, mut f: F) -> Poll<Result<R, Ref<'_, T>>> is
out of date: it still says "Returns `None` if the channel is closed." Update the
documentation to reflect the current return contract by describing that when the
channel is closed the method returns Poll::Ready(Err(Ref)) (i.e., a ready Err
containing a Ref guard), and adjust the surrounding text to use Poll/Result/Ref
terminology (mention Waiter, Mut, and Ref) so readers understand that Pending
registers the waiter and Ready(Err(Ref)) represents close instead of None.
🧹 Nitpick comments (2)
rs/moq-lite/src/lite/publisher.rs (1)

302-305: Limit noop waiter lifetime to the drain pass.

Line 303/304 can leave a live no-op waiter registration for the rest of run_track. Scoping noop to the drain block lets that weak entry become collectible sooner.

♻️ Suggested tweak
-		let noop = conducer::Waiter::noop();
-		while let Poll::Ready(Ok(Some(_group))) = track.poll_next_group(&noop) {}
+		{
+			let noop = conducer::Waiter::noop();
+			while let Poll::Ready(Ok(Some(_group))) = track.poll_next_group(&noop) {}
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 302 - 305, The noop waiter
created by let noop = conducer::Waiter::noop() remains live for the entire
run_track, keeping a weak registration; instead, limit its lifetime to the drain
pass by scoping it to the loop/block that calls track.poll_next_group — e.g.,
create the noop inside the drain loop or an enclosing block so it is dropped
immediately after the while finishes so the weak entry can be collected; update
the code around conducer::Waiter::noop() and track.poll_next_group(...)
accordingly.
rs/moq-lite/src/ietf/publisher.rs (1)

199-201: Scope the noop waiter to avoid a long-lived no-op registration.

This has the same lifetime pattern as the Lite publisher drain: keep noop local to the skip loop so the weak registration can be cleaned earlier.

♻️ Suggested tweak
-		let noop = conducer::Waiter::noop();
-		while let Poll::Ready(Ok(Some(_group))) = track.poll_next_group(&noop) {}
+		{
+			let noop = conducer::Waiter::noop();
+			while let Poll::Ready(Ok(Some(_group))) = track.poll_next_group(&noop) {}
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/publisher.rs` around lines 199 - 201, Move the noop
Waiter creation into the skip loop so its weak registration is short-lived:
instead of creating a single let noop = conducer::Waiter::noop() outside the
loop, create the noop inside each iteration when calling track.poll_next_group
(e.g. construct a new conducer::Waiter::noop() for each poll). Update the loop
using track.poll_next_group(&noop) so the noop's lifetime ends at the end of
each iteration, allowing the weak registration to be cleaned promptly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/conducer/src/consumer.rs`:
- Around line 30-31: Update the doc comment for the poll method to clarify the
error condition: change the description for `Err(Ref)` on `pub fn poll<F,
R>(&self, waiter: &Waiter, mut f: F) -> Poll<Result<R, Ref<'_, T>>>` to state
that it is returned when the channel has been closed while the condition is
still pending (rather than saying “channel and condition is still Pending”);
reference `poll`, `Ref<'_, T>`, and `Waiter` in the text to make it clear the
error represents channel closure with an unresolved condition.

In `@rs/hang/src/container/consumer.rs`:
- Around line 335-353: poll_min_timestamp and poll_max_timestamp currently
return Poll::Pending if no timestamp is buffered, which deadlocks for
finished-but-empty groups; change both methods (poll_min_timestamp and
poll_max_timestamp) to check the group's finished/errored state after calling
buffer_one/buffer_all (use the same place you inspect self.min_timestamp /
self.max_timestamp) and if the group is finished or has an error return
Poll::Ready(Err(...)) with an appropriate Error value instead of Poll::Pending;
ensure you still return Poll::Pending only when the group is neither finished
nor errored and no timestamp is available.

In `@rs/moq-lite/src/model/frame.rs`:
- Around line 112-119: The poll_read_all_chunks function can panic when called
with index == usize::MAX (set elsewhere), because it slices
self.chunks[index..]; change the function to clamp the start index to the chunks
length before slicing (e.g., compute let start = index.min(self.chunks.len())
and return Ok(&self.chunks[start..])) so out-of-bounds accesses are avoided;
apply the same guarded/clamped slicing approach to the other similar path around
the code referenced (the repeated read_all path at lines ~252-255) and keep the
existing abort/remaining checks intact in poll_read_all_chunks.

In `@rs/moq-lite/src/model/track.rs`:
- Around line 73-79: The terminal-state checks currently return completion when
self.final_sequence.is_some() before checking self.abort, which can mask abort
errors; change the order so you first check if let Some(err) = &self.abort and
return Poll::Ready(Err(err.clone())), then check final_sequence and return
Poll::Ready(Ok(None)); apply this same reorder to the other similar blocks
identified (the checks around lines 91-100 and 105-113) so abort is always
prioritized over final_sequence in the relevant methods (look for usages of
self.final_sequence and self.abort in track.rs).

---

Outside diff comments:
In `@rs/conducer/src/producer.rs`:
- Around line 68-73: The doc comment for pub fn poll<F, R>(&self, waiter:
&Waiter, mut f: F) -> Poll<Result<R, Ref<'_, T>>> is out of date: it still says
"Returns `None` if the channel is closed." Update the documentation to reflect
the current return contract by describing that when the channel is closed the
method returns Poll::Ready(Err(Ref)) (i.e., a ready Err containing a Ref guard),
and adjust the surrounding text to use Poll/Result/Ref terminology (mention
Waiter, Mut, and Ref) so readers understand that Pending registers the waiter
and Ready(Err(Ref)) represents close instead of None.

---

Nitpick comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 199-201: Move the noop Waiter creation into the skip loop so its
weak registration is short-lived: instead of creating a single let noop =
conducer::Waiter::noop() outside the loop, create the noop inside each iteration
when calling track.poll_next_group (e.g. construct a new
conducer::Waiter::noop() for each poll). Update the loop using
track.poll_next_group(&noop) so the noop's lifetime ends at the end of each
iteration, allowing the weak registration to be cleaned promptly.

In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 302-305: The noop waiter created by let noop =
conducer::Waiter::noop() remains live for the entire run_track, keeping a weak
registration; instead, limit its lifetime to the drain pass by scoping it to the
loop/block that calls track.poll_next_group — e.g., create the noop inside the
drain loop or an enclosing block so it is dropped immediately after the while
finishes so the weak entry can be collected; update the code around
conducer::Waiter::noop() and track.poll_next_group(...) accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 89ea9e52-d129-4efd-be28-bf09c04be443

📥 Commits

Reviewing files that changed from the base of the PR and between d0117c4 and 5d6927c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (13)
  • rs/conducer/src/consumer.rs
  • rs/conducer/src/producer.rs
  • rs/conducer/src/waiter.rs
  • rs/hang/Cargo.toml
  • rs/hang/src/container/consumer.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/frame.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/track.rs

Comment on lines +73 to 79
// TODO once we have drop notifications, check if index == final_sequence.
if self.final_sequence.is_some() {
Poll::Ready(None)
Poll::Ready(Ok(None))
} else if let Some(err) = &self.abort {
Poll::Ready(Err(err.clone()))
} else {
Poll::Pending
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Abort can be masked by final_sequence in terminal-state checks.

These methods prioritize final_sequence over abort. If a track is finished and then aborted, callers may receive clean completion instead of the abort error.

🩹 Suggested fix (prioritize abort first)
 fn poll_next_group(&self, index: usize) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
@@
-	if self.final_sequence.is_some() {
-		Poll::Ready(Ok(None))
-	} else if let Some(err) = &self.abort {
+	if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.final_sequence.is_some() {
+		Poll::Ready(Ok(None))
 	} else {
 		Poll::Pending
 	}
 }
@@
 fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
@@
-	if let Some(fin) = self.final_sequence
-		&& sequence >= fin
-	{
-		return Poll::Ready(Ok(None));
-	}
-
 	if let Some(err) = &self.abort {
 		return Poll::Ready(Err(err.clone()));
 	}
+
+	if let Some(fin) = self.final_sequence
+		&& sequence >= fin
+	{
+		return Poll::Ready(Ok(None));
+	}
@@
 fn poll_closed(&self) -> Poll<Result<()>> {
-	if self.final_sequence.is_some() {
-		Poll::Ready(Ok(()))
-	} else if let Some(err) = &self.abort {
+	if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.final_sequence.is_some() {
+		Poll::Ready(Ok(()))
 	} else {
 		Poll::Pending
 	}
 }

Also applies to: 91-100, 105-113

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/track.rs` around lines 73 - 79, The terminal-state
checks currently return completion when self.final_sequence.is_some() before
checking self.abort, which can mask abort errors; change the order so you first
check if let Some(err) = &self.abort and return Poll::Ready(Err(err.clone())),
then check final_sequence and return Poll::Ready(Ok(None)); apply this same
reorder to the other similar blocks identified (the checks around lines 91-100
and 105-113) so abort is always prioritized over final_sequence in the relevant
methods (look for usages of self.final_sequence and self.abort in track.rs).

@kixelated kixelated changed the title Start TrackConsumer at index 0, let publishers skip to latest Fix OrderedConsumer... for good? Mar 5, 2026
- Fix poll_min/max_timestamp deadlock on finished empty groups by
  checking group.poll_finished() and returning Ok(None)
- Add poll_finished/finished to GroupConsumer and TrackConsumer
- Add TrackConsumer::start_at/latest to replace noop waiter drain loop
- Fix poll_read_all_chunks panic when index > chunks.len()
- Update conducer doc comments to accurately describe Ref/Waiter/Poll

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
rs/moq-lite/src/model/track.rs (1)

75-83: ⚠️ Potential issue | 🟠 Major

Abort is still masked by final-state checks in terminal polling.

poll_next_group, poll_get_group, poll_closed, and poll_finished should check abort first; otherwise callers can observe clean completion when the track was aborted.

🩹 Suggested fix
 fn poll_next_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
@@
-	if self.final_sequence.is_some() {
-		Poll::Ready(Ok(None))
-	} else if let Some(err) = &self.abort {
+	if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.final_sequence.is_some() {
+		Poll::Ready(Ok(None))
 	} else {
 		Poll::Pending
 	}
 }

 fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
@@
-	if let Some(fin) = self.final_sequence
-		&& sequence >= fin
-	{
-		return Poll::Ready(Ok(None));
-	}
-
 	if let Some(err) = &self.abort {
 		return Poll::Ready(Err(err.clone()));
 	}
+
+	if let Some(fin) = self.final_sequence
+		&& sequence >= fin
+	{
+		return Poll::Ready(Ok(None));
+	}

 	Poll::Pending
 }

 fn poll_closed(&self) -> Poll<Result<()>> {
-	if self.final_sequence.is_some() {
-		Poll::Ready(Ok(()))
-	} else if let Some(err) = &self.abort {
+	if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.final_sequence.is_some() {
+		Poll::Ready(Ok(()))
 	} else {
 		Poll::Pending
 	}
 }

 fn poll_finished(&self) -> Poll<Result<u64>> {
-	if let Some(fin) = self.final_sequence {
-		Poll::Ready(Ok(fin))
-	} else if let Some(err) = &self.abort {
+	if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if let Some(fin) = self.final_sequence {
+		Poll::Ready(Ok(fin))
 	} else {
 		Poll::Pending
 	}
 }

Also applies to: 93-102, 107-115, 146-154

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/track.rs` around lines 75 - 83, The polling functions
poll_next_group, poll_get_group, poll_closed, and poll_finished currently return
terminal success when self.final_sequence.is_some() before checking self.abort,
which masks aborts; reorder the checks so you first test if let Some(err) =
&self.abort and return Poll::Ready(Err(err.clone())) if present, then check
self.final_sequence and return Poll::Ready(Ok(None)) otherwise, leaving
Poll::Pending as the fallback; apply the same change to the equivalent blocks
referenced around lines 93-102, 107-115, and 146-154 so abort is always observed
prior to reporting clean completion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/hang/src/container/consumer.rs`:
- Around line 170-172: The subtraction max_timestamp - min_timestamp can panic
on Duration underflow; in the block checking if let Some(new_idx) = min_idx &&
max_timestamp - min_timestamp >= self.max_latency { replace the direct
subtraction with a safe saturating subtraction (e.g., use
max_timestamp.saturating_sub(min_timestamp)) so the comparison never panics;
update the condition that references max_timestamp, min_timestamp, and
self.max_latency accordingly to use the saturating result.

In `@rs/moq-lite/src/model/frame.rs`:
- Around line 71-80: The poll_read_chunk implementation can return Ok(None) when
remaining == 0 even if an abort exists, masking terminal errors; update
poll_read_chunk (and the similar function handling trailers, e.g.,
poll_read_trailer) to check for self.abort before checking self.remaining == 0
so that an existing abort error is returned (Poll::Ready(Err(err.clone())))
instead of reporting normal completion; ensure the earlier chunk lookup
(self.chunks.get(index)) remains first, then check abort, then remaining==0,
otherwise Poll::Pending.

In `@rs/moq-lite/src/model/group.rs`:
- Around line 77-87: The terminal-state check order in poll_get_frame (and
similarly in poll_finished) returns Ok(None when fin is true even if abort is
set; change the conditional order so you check self.abort before self.fin: if
let Some(err) = &self.abort { Poll::Ready(Err(err.clone())) } else if self.fin {
Poll::Ready(Ok(None)) } (keep the existing frames check first). Update both
poll_get_frame and poll_finished to prioritize abort and return the cloned error
when abort is present.

---

Duplicate comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 75-83: The polling functions poll_next_group, poll_get_group,
poll_closed, and poll_finished currently return terminal success when
self.final_sequence.is_some() before checking self.abort, which masks aborts;
reorder the checks so you first test if let Some(err) = &self.abort and return
Poll::Ready(Err(err.clone())) if present, then check self.final_sequence and
return Poll::Ready(Ok(None)) otherwise, leaving Poll::Pending as the fallback;
apply the same change to the equivalent blocks referenced around lines 93-102,
107-115, and 146-154 so abort is always observed prior to reporting clean
completion.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 15ae21aa-9891-4301-850b-4f478199e1a7

📥 Commits

Reviewing files that changed from the base of the PR and between 5d6927c and 57d3e48.

📒 Files selected for processing (8)
  • rs/conducer/src/consumer.rs
  • rs/conducer/src/producer.rs
  • rs/hang/src/container/consumer.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/model/frame.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/track.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/ietf/publisher.rs

Comment on lines +71 to 80
fn poll_read_chunk(&self, index: usize) -> Poll<Result<Option<Bytes>>> {
if let Some(chunk) = self.chunks.get(index).cloned() {
Poll::Ready(Some(chunk))
Poll::Ready(Ok(Some(chunk)))
} else if self.remaining == 0 {
Poll::Ready(None)
Poll::Ready(Ok(None))
} else if let Some(err) = &self.abort {
Poll::Ready(Err(err.clone()))
} else {
Poll::Pending
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

abort can be masked in chunk readers when remaining == 0.

These branches can report normal completion even when an abort exists, which hides terminal errors.

🩹 Suggested fix
 fn poll_read_chunk(&self, index: usize) -> Poll<Result<Option<Bytes>>> {
 	if let Some(chunk) = self.chunks.get(index).cloned() {
 		Poll::Ready(Ok(Some(chunk)))
-	} else if self.remaining == 0 {
-		Poll::Ready(Ok(None))
 	} else if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.remaining == 0 {
+		Poll::Ready(Ok(None))
 	} else {
 		Poll::Pending
 	}
 }

 fn poll_read_chunks(&self, index: usize) -> Poll<Result<Vec<Bytes>>> {
-	if index >= self.chunks.len() && self.remaining == 0 {
+	if let Some(err) = &self.abort {
+		Poll::Ready(Err(err.clone()))
+	} else if index >= self.chunks.len() && self.remaining == 0 {
 		Poll::Ready(Ok(Vec::new()))
 	} else if self.remaining == 0 {
 		Poll::Ready(Ok(self.chunks[index..].to_vec()))
-	} else if let Some(err) = &self.abort {
-		Poll::Ready(Err(err.clone()))
 	} else {
 		Poll::Pending
 	}
 }

Also applies to: 83-92

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/frame.rs` around lines 71 - 80, The poll_read_chunk
implementation can return Ok(None) when remaining == 0 even if an abort exists,
masking terminal errors; update poll_read_chunk (and the similar function
handling trailers, e.g., poll_read_trailer) to check for self.abort before
checking self.remaining == 0 so that an existing abort error is returned
(Poll::Ready(Err(err.clone()))) instead of reporting normal completion; ensure
the earlier chunk lookup (self.chunks.get(index)) remains first, then check
abort, then remaining==0, otherwise Poll::Pending.

Comment on lines +77 to +87
fn poll_get_frame(&self, index: usize) -> Poll<Result<Option<FrameConsumer>>> {
if let Some(frame) = self.frames.get(index) {
Poll::Ready(Some(frame.clone()))
Poll::Ready(Ok(Some(frame.consume())))
} else if self.fin {
Poll::Ready(None)
Poll::Ready(Ok(None))
} else if let Some(err) = &self.abort {
Poll::Ready(Err(err.clone()))
} else {
Poll::Pending
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Prioritize abort before fin in terminal-state checks.

poll_get_frame/poll_finished currently return clean completion when fin is set even if abort is also set, which can hide terminal errors.

🩹 Suggested fix
 fn poll_get_frame(&self, index: usize) -> Poll<Result<Option<FrameConsumer>>> {
 	if let Some(frame) = self.frames.get(index) {
 		Poll::Ready(Ok(Some(frame.consume())))
-	} else if self.fin {
-		Poll::Ready(Ok(None))
 	} else if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.fin {
+		Poll::Ready(Ok(None))
 	} else {
 		Poll::Pending
 	}
 }

 fn poll_finished(&self) -> Poll<Result<u64>> {
-	if self.fin {
-		Poll::Ready(Ok(self.frames.len() as u64))
-	} else if let Some(err) = &self.abort {
+	if let Some(err) = &self.abort {
 		Poll::Ready(Err(err.clone()))
+	} else if self.fin {
+		Poll::Ready(Ok(self.frames.len() as u64))
 	} else {
 		Poll::Pending
 	}
 }

Also applies to: 89-97

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/group.rs` around lines 77 - 87, The terminal-state
check order in poll_get_frame (and similarly in poll_finished) returns Ok(None
when fin is true even if abort is set; change the conditional order so you check
self.abort before self.fin: if let Some(err) = &self.abort {
Poll::Ready(Err(err.clone())) } else if self.fin { Poll::Ready(Ok(None)) } (keep
the existing frames check first). Update both poll_get_frame and poll_finished
to prioritize abort and return the cloned error when abort is present.

kixelated and others added 2 commits March 5, 2026 12:25
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
VecDeque with partition_point is the right choice here since the code
relies on ordered iteration, range drains, and front access.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
rs/hang/src/container/consumer.rs (2)

170-172: ⚠️ Potential issue | 🔴 Critical

Use saturating_sub for latency-span comparison to avoid panic.

Line 171 can panic when max_timestamp < min_timestamp due Duration underflow on subtraction.

Proposed fix
-			if let Some(new_idx) = min_idx
-				&& max_timestamp - min_timestamp >= self.max_latency
-			{
+			if let Some(new_idx) = min_idx
+				&& max_timestamp.saturating_sub(min_timestamp) >= self.max_latency
+			{
#!/bin/bash
# Verify direct Duration subtraction is still used in latency-span logic.
rg -nP 'max_timestamp\s*-\s*min_timestamp' rs/hang/src/container/consumer.rs
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/container/consumer.rs` around lines 170 - 172, The comparison
currently performs direct subtraction (max_timestamp - min_timestamp) which can
panic on Duration underflow; update the condition in the consumer logic (the
branch using min_idx, max_timestamp, min_timestamp, and self.max_latency) to use
saturating_sub — i.e., replace the subtraction with
max_timestamp.saturating_sub(min_timestamp) so the duration difference never
underflows and the check against self.max_latency remains correct.

150-156: ⚠️ Potential issue | 🔴 Critical

Finished-empty future groups can still leave the poll loop parked forever across sequence gaps.

When finished is true and the next pending group has no frames (poll_min_timestamp -> Ready(None)), min_idx stays None and Line 186 returns Pending without a wake source.

Proposed fix
 			if let Some(new_idx) = min_idx
 				&& max_timestamp - min_timestamp >= self.max_latency
 			{
 				self.pending.drain(0..new_idx);
 				let new_current = self.pending.front().map(|g| g.info.sequence).unwrap();

 				tracing::debug!(old = self.current, new = new_current, "skipping slow groups");

 				self.current = new_current;
 				continue;
 			}
 
+			// If the track is finished, explicitly advance over finished-empty groups
+			// that are ahead of `current` so we don't park forever on a missing sequence.
+			if finished {
+				if let Some(front) = self.pending.front_mut()
+					&& front.info.sequence > self.current
+					&& matches!(front.poll_min_timestamp(waiter)?, Poll::Ready(None))
+				{
+					self.current = front.info.sequence;
+					continue;
+				}
+			}
+
 			if finished && self.pending.is_empty() {
 				return Poll::Ready(Ok(None));
 			}

Also applies to: 182-186

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/container/consumer.rs` around lines 150 - 156, The poll loop can
return Pending with no wake source when poll_min_timestamp returns Ready(None)
for a finished group; fix by treating Ready(None) from a finished group as a
valid min candidate: when group.poll_min_timestamp(waiter)? yields
Poll::Ready(None) and the group is finished, set min_idx = Some(i) (and break)
so the loop will register the group's waker (or otherwise ensure a wake source).
Apply the same change in both places handling Poll::Ready(None) around the
poll_min_timestamp call (the block that updates min_timestamp/min_idx and the
analogous 182-186 region) so empty finished groups do not leave the loop parked
forever.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@rs/hang/src/container/consumer.rs`:
- Around line 170-172: The comparison currently performs direct subtraction
(max_timestamp - min_timestamp) which can panic on Duration underflow; update
the condition in the consumer logic (the branch using min_idx, max_timestamp,
min_timestamp, and self.max_latency) to use saturating_sub — i.e., replace the
subtraction with max_timestamp.saturating_sub(min_timestamp) so the duration
difference never underflows and the check against self.max_latency remains
correct.
- Around line 150-156: The poll loop can return Pending with no wake source when
poll_min_timestamp returns Ready(None) for a finished group; fix by treating
Ready(None) from a finished group as a valid min candidate: when
group.poll_min_timestamp(waiter)? yields Poll::Ready(None) and the group is
finished, set min_idx = Some(i) (and break) so the loop will register the
group's waker (or otherwise ensure a wake source). Apply the same change in both
places handling Poll::Ready(None) around the poll_min_timestamp call (the block
that updates min_timestamp/min_idx and the analogous 182-186 region) so empty
finished groups do not leave the loop parked forever.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 881dd4a5-0629-4acd-a42e-6eacbcfa60f9

📥 Commits

Reviewing files that changed from the base of the PR and between cd4dc8f and 85c0e8f.

📒 Files selected for processing (1)
  • rs/hang/src/container/consumer.rs

@kixelated kixelated enabled auto-merge (squash) March 5, 2026 22:38
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
rs/hang/src/container/consumer.rs (1)

145-186: ⚠️ Potential issue | 🔴 Critical

Forward terminal groups are never retired, which can deadlock read() after track finish.

poll_min_timestamp/poll_max_timestamp now surface terminal empty/error states, but the min/max scans ignore Poll::Ready(Err(_)). If such a group is ahead of current, it can remain in pending forever; combined with finished && !pending.is_empty(), this returns Poll::Pending indefinitely.

Proposed fix
-			for (i, group) in self.pending.iter_mut().enumerate() {
-				if group.info.sequence <= self.current {
-					continue;
-				}
-
-				if let Poll::Ready(Ok(ts)) = group.poll_min_timestamp(waiter) {
-					min_timestamp = min_timestamp.min(ts.into());
-					min_idx = Some(i);
-					break; // We know future groups won't be older than this.
-				}
-			}
+			let mut i = 0;
+			while i < self.pending.len() {
+				if self.pending[i].info.sequence <= self.current {
+					i += 1;
+					continue;
+				}
+
+				match self.pending[i].poll_min_timestamp(waiter) {
+					Poll::Ready(Ok(ts)) => {
+						min_timestamp = min_timestamp.min(ts.into());
+						min_idx = Some(i);
+						break; // We know future groups won't be older than this.
+					}
+					Poll::Ready(Err(e)) => {
+						let sequence = self.pending[i].info.sequence;
+						tracing::warn!(?sequence, error = ?e, "dropping terminal group");
+						self.pending.remove(i);
+					}
+					Poll::Pending => i += 1,
+				}
+			}
@@
-			if finished && self.pending.is_empty() {
-				return Poll::Ready(Ok(None));
-			}
+			if finished {
+				if self.pending.is_empty() {
+					return Poll::Ready(Ok(None));
+				}
+
+				// No more groups can arrive; cross permanent sequence gaps.
+				if let Some(next) = self.pending.front().map(|g| g.info.sequence)
+					&& next > self.current
+				{
+					self.current = next;
+					continue;
+				}
+			}

Also applies to: 334-362

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/hang/src/container/consumer.rs`:
- Around line 190-193: The docstring for poll_read_finish currently claims it
"Returns Pending until all groups have been consumed" which overstates behavior;
update the comment for fn poll_read_finish(&mut self, waiter: &conducer::Waiter)
-> Poll<Result<(), Error>> to say it waits for the track's group stream to
finish (e.g., "Returns Pending until the track's group stream has finished" or
"waits for the track’s group stream to end") so the wording matches the
implementation semantics involving the waiter and the track's group stream.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ea6623dd-9424-4983-84cb-6acf1190e320

📥 Commits

Reviewing files that changed from the base of the PR and between 85c0e8f and 2858a48.

📒 Files selected for processing (2)
  • rs/hang/src/container/consumer.rs
  • rs/hang/src/error.rs

Comment on lines +190 to +193
// Reads any new groups from the track until we're completely finished.
//
// Returns Pending until all groups have been consumed.
fn poll_read_finish(&mut self, waiter: &conducer::Waiter) -> Poll<Result<(), Error>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

poll_read_finish docstring overstates behavior.

The method waits for the track’s group stream to finish, not for “all groups to be consumed”. Please update wording to match implementation semantics.

Doc-only tweak
-	// Reads any new groups from the track until we're completely finished.
-	//
-	// Returns Pending until all groups have been consumed.
+	// Reads newly announced groups from the track until the track is finished.
+	//
+	// Returns Pending while the track may still yield more groups.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Reads any new groups from the track until we're completely finished.
//
// Returns Pending until all groups have been consumed.
fn poll_read_finish(&mut self, waiter: &conducer::Waiter) -> Poll<Result<(), Error>> {
// Reads newly announced groups from the track until the track is finished.
//
// Returns Pending while the track may still yield more groups.
fn poll_read_finish(&mut self, waiter: &conducer::Waiter) -> Poll<Result<(), Error>> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/container/consumer.rs` around lines 190 - 193, The docstring for
poll_read_finish currently claims it "Returns Pending until all groups have been
consumed" which overstates behavior; update the comment for fn
poll_read_finish(&mut self, waiter: &conducer::Waiter) -> Poll<Result<(),
Error>> to say it waits for the track's group stream to finish (e.g., "Returns
Pending until the track's group stream has finished" or "waits for the track’s
group stream to end") so the wording matches the implementation semantics
involving the waiter and the track's group stream.

@kixelated kixelated merged commit 5e533f6 into main Mar 5, 2026
2 checks passed
@kixelated kixelated deleted the kixelated/consume-from-zero branch March 5, 2026 22:49
@moq-bot moq-bot bot mentioned this pull request Mar 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant