Skip to content

MoqSrc/MoqSink: refactor to async session tasks with channel-based pad forwarding#36

Draft
SteveMcFarlin wants to merge 4 commits intomoq-dev:mainfrom
SteveMcFarlin:src_sink_refactor
Draft

MoqSrc/MoqSink: refactor to async session tasks with channel-based pad forwarding#36
SteveMcFarlin wants to merge 4 commits intomoq-dev:mainfrom
SteveMcFarlin:src_sink_refactor

Conversation

@SteveMcFarlin
Copy link

@SteveMcFarlin SteveMcFarlin commented Mar 5, 2026

MoqSink Summary

  • State management: Replace the block_on-based setup() with start_session()/stop_session() and a Tokio-spawned run_session, so GLib state changes no longer block while establishing MoQ sessions. All session state now lives inside the async task’s RuntimeState.
  • Pad handling: Keep dynamic pad creation, but forward CAPS/buffers through a bounded control channel (forward_event/forward_buffer). Decoder instantiation (handle_caps) and timestamp math remain identical; actual decoding/publishing now happens off the GLib thread.
  • Network setup: Rebuild the publish pipeline asynchronously—create OriginProducer, BroadcastProducer, catalog via moq_mux::CatalogProducer::new, publish the broadcast, and connect with client.with_publish(origin.consume()). The live Session is retained inside RuntimeState.
  • Flow control: blocking_send provides clearer backpressure semantics (eventually yields FLOW_FLUSHING), though upstream queues are still recommended for bursty inputs.

Potential regressions / tests

  1. RuntimeState owns all session/broadcast state now—verify nothing else expects synchronous access.
  2. CAPS/buffer ordering per pad is preserved, but cross-pad ordering depends on message interleaving; test pad add/remove under load.
  3. Only CAPS events are forwarded explicitly; add control messages if flush/segment handling is needed.
  4. Errors are logged and the channel closes; integration tests should cover handshake failure, decoder errors, and shutdown timing to ensure pipelines react gracefully.

MoqSrc Summary

  • Element Typeimp.rs now subclasses gst::Element instead of gst::Bin, so the source behaves like a leaf element and owns all pad logic itself.
  • State Managementchange_state no longer blocks on setup(). Ready→Paused just spins up a Tokio‑backed SessionController, while Paused→Ready signals it to stop
  • Session Lifecycle – Catalog discovery, pad creation, and buffer forwarding now flow through a control_tx/control_rx channel plus run_session/spawn_track_pump, so all networking happens off the GLib thread.
  • Pad Handling – Pads are created on demand via ControlMessage::CreatePad, and PadMessage routing through a GLib helper keeps buffer/event pushes serialized on the main loop
  • Codec Supportvideo_caps now emits caps for H.264, H.265 (hev1/hvc1/byte-stream), and AV1, bringing the source in line with the sink’s decoder stack.
  • Cleanupstop_session() drops the control task, sends PadMessage::Drop to every pad, and clears the channel sender so downstream sees a flush.
  • Testing – Added a unit test covering the GLib forwarder to ensure channel messages stay in order.
  • Runtime Usage – Still uses a dedicated Tokio runtime, but with LazyLock and all async work centralized in the session task instead of ad-hoc tokio::spawn calls inside setup().

Potential regressions

  • Channel backpressure vs. GLib timing – Pads now use mpsc::UnboundedSender into the main-context forwarder. If the forwarder stalls (e.g., GLib thread busy) those queues can grow without bound; previously the direct pad.push would backpressure immediately.
  • Per-pad ordering – Catalog pads are now independent tasks sending through PadMessage channels. Ordering is preserved per pad, but cross-pad sequencing may differ from the old implementation that happened to poll tracks sequentially inside setup().
  • Shutdown timingstop_session sends PadMessage::Drop and aborts the GLib task, but downstream might see EOS/flush later than before because we now wait for the async pumps to notice shutdown.
  • Codec negotiation – H.265/AV1 caps mirror the sink’s assumptions (e.g., choosing hev1 vs hvc1 based on description), but if downstream expects different stream-format strings or fields, negotiation could fail.
  • Error propagation – Session failures are only surfaced via ControlMessage::ReportErrorgst::element_error!. There’s still no recovery path; repeated errors may leave the element in a flushing state without attempting reconnection, same as before but now more asynchronous.

@SteveMcFarlin SteveMcFarlin changed the title moqsink: async session controller with dynamic pad forwarding GST moqsink and moqsrc refactor Mar 5, 2026
@SteveMcFarlin SteveMcFarlin changed the title GST moqsink and moqsrc refactor MoqSrc/MoqSink: refactor to async session tasks with channel-based pad forwarding Mar 5, 2026
Copy link
Collaborator

@kixelated kixelated left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits

src/sink/imp.rs Outdated
static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not needed. Quinn itself doesn't scale past a single thread.

src/sink/imp.rs Outdated

#[derive(Debug)]
struct SessionHandle {
sender: mpsc::Sender<ControlMessage>,
Copy link
Collaborator

@kixelated kixelated Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to an unbounded sender?

reference_pts: None,
},
);
async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver<ControlMessage>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO return an error

-> anyhow::Result<()> {

So we only need to gst::error! in one place.

pad_endpoint: PadEndpoint,
mut shutdown: watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
RUNTIME.spawn(async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO you should remove a level of indentation by inlining the spawn:

let handle = RUNTIME.spawn(run_track_pump);

fn run_track_pump(
	mut track: hang::container::OrderedConsumer,
	descriptor: TrackDescriptor,
	pad_endpoint: PadEndpoint,
	mut shutdown: watch::Receiver<bool>,
) {

F: FnMut(T) -> bool + 'static,
{
let ctx = context.clone();
ctx.spawn_local(async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this? Should we be using this runtime instead..?

use std::rc::Rc;

#[test]
fn forwarder_delivers_messages_in_order() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk if this test is needed. mpsc works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants