MoqSrc/MoqSink: refactor to async session tasks with channel-based pad forwarding#36
Draft
SteveMcFarlin wants to merge 4 commits intomoq-dev:mainfrom
Draft
MoqSrc/MoqSink: refactor to async session tasks with channel-based pad forwarding#36SteveMcFarlin wants to merge 4 commits intomoq-dev:mainfrom
SteveMcFarlin wants to merge 4 commits intomoq-dev:mainfrom
Conversation
…ing and H.264/H.265/AV1 caps support.
kixelated
reviewed
Mar 5, 2026
src/sink/imp.rs
Outdated
| static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| { | ||
| tokio::runtime::Builder::new_multi_thread() | ||
| .enable_all() | ||
| .worker_threads(4) |
Collaborator
There was a problem hiding this comment.
Probably not needed. Quinn itself doesn't scale past a single thread.
src/sink/imp.rs
Outdated
|
|
||
| #[derive(Debug)] | ||
| struct SessionHandle { | ||
| sender: mpsc::Sender<ControlMessage>, |
Collaborator
There was a problem hiding this comment.
Switch to an unbounded sender?
| reference_pts: None, | ||
| }, | ||
| ); | ||
| async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver<ControlMessage>) { |
Collaborator
There was a problem hiding this comment.
IMO return an error
-> anyhow::Result<()> {So we only need to gst::error! in one place.
| pad_endpoint: PadEndpoint, | ||
| mut shutdown: watch::Receiver<bool>, | ||
| ) -> tokio::task::JoinHandle<()> { | ||
| RUNTIME.spawn(async move { |
Collaborator
There was a problem hiding this comment.
IMO you should remove a level of indentation by inlining the spawn:
let handle = RUNTIME.spawn(run_track_pump);
fn run_track_pump(
mut track: hang::container::OrderedConsumer,
descriptor: TrackDescriptor,
pad_endpoint: PadEndpoint,
mut shutdown: watch::Receiver<bool>,
) {| F: FnMut(T) -> bool + 'static, | ||
| { | ||
| let ctx = context.clone(); | ||
| ctx.spawn_local(async move { |
Collaborator
There was a problem hiding this comment.
What is this? Should we be using this runtime instead..?
| use std::rc::Rc; | ||
|
|
||
| #[test] | ||
| fn forwarder_delivers_messages_in_order() { |
Collaborator
There was a problem hiding this comment.
idk if this test is needed. mpsc works
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
MoqSink Summary
block_on-basedsetup()withstart_session()/stop_session()and a Tokio-spawnedrun_session, so GLib state changes no longer block while establishing MoQ sessions. All session state now lives inside the async task’sRuntimeState.forward_event/forward_buffer). Decoder instantiation (handle_caps) and timestamp math remain identical; actual decoding/publishing now happens off the GLib thread.OriginProducer,BroadcastProducer, catalog viamoq_mux::CatalogProducer::new, publish the broadcast, and connect withclient.with_publish(origin.consume()). The liveSessionis retained insideRuntimeState.blocking_sendprovides clearer backpressure semantics (eventually yieldsFLOW_FLUSHING), though upstream queues are still recommended for bursty inputs.Potential regressions / tests
RuntimeStateowns all session/broadcast state now—verify nothing else expects synchronous access.MoqSrc Summary
imp.rsnow subclassesgst::Elementinstead ofgst::Bin, so the source behaves like a leaf element and owns all pad logic itself.change_stateno longer blocks onsetup(). Ready→Paused just spins up a Tokio‑backedSessionController, while Paused→Ready signals it to stopcontrol_tx/control_rxchannel plusrun_session/spawn_track_pump, so all networking happens off the GLib thread.ControlMessage::CreatePad, andPadMessagerouting through a GLib helper keeps buffer/event pushes serialized on the main loopvideo_capsnow emits caps for H.264, H.265 (hev1/hvc1/byte-stream), and AV1, bringing the source in line with the sink’s decoder stack.stop_session()drops the control task, sendsPadMessage::Dropto every pad, and clears the channel sender so downstream sees a flush.LazyLockand all async work centralized in the session task instead of ad-hoctokio::spawncalls insidesetup().Potential regressions
mpsc::UnboundedSenderinto the main-context forwarder. If the forwarder stalls (e.g., GLib thread busy) those queues can grow without bound; previously the directpad.pushwould backpressure immediately.PadMessagechannels. Ordering is preserved per pad, but cross-pad sequencing may differ from the old implementation that happened to poll tracks sequentially insidesetup().stop_sessionsendsPadMessage::Dropand aborts the GLib task, but downstream might see EOS/flush later than before because we now wait for the async pumps to notice shutdown.hev1vshvc1based ondescription), but if downstream expects differentstream-formatstrings or fields, negotiation could fail.ControlMessage::ReportError→gst::element_error!. There’s still no recovery path; repeated errors may leave the element in a flushing state without attempting reconnection, same as before but now more asynchronous.