Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 105 additions & 118 deletions rs/moq-mux/src/import/hls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
//! independent of any particular HTTP client; callers provide an implementation
//! of [`Fetcher`] to perform the actual network I/O.

use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::path::PathBuf;
use std::time::Duration;

use anyhow::Context;
use bytes::Bytes;
use m3u8_rs::{
AlternativeMedia, AlternativeMediaType, Map, MasterPlaylist, MediaPlaylist, MediaSegment, Resolution, VariantStream,
Map, MasterPlaylist, MediaPlaylist, MediaSegment,
};
use reqwest::Client;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -76,13 +74,21 @@ struct StepOutcome {
///
/// Provides `init()` to prime the ingest with initial segments, and `service()`
/// to run the continuous ingest loop.
///
/// In addition to importing media, this generates HLS playlists as MoQ tracks:
/// - `playlist.m3u8`: Main/master playlist referencing all renditions
/// - `video{n}.m3u8`: Media playlist for each video rendition
/// - `audio.m3u8`: Media playlist for the audio rendition (if present)
/// @TODO: multi audio
pub struct Hls {
/// Broadcast that all CMAF importers write into.
broadcast: moq_lite::BroadcastProducer,

/// The catalog being produced.
catalog: hang::CatalogProducer,

master_playlist_track: Option<moq_lite::TrackProducer>,

/// fMP4 importers for each discovered video rendition.
/// Each importer feeds a separate MoQ track but shares the same catalog.
video_importers: Vec<Fmp4>,
Expand All @@ -108,14 +114,20 @@ enum TrackKind {

struct TrackState {
playlist: Url,
playlist_track: moq_lite::TrackProducer,
track_id: String,
init_track: Option<moq_lite::TrackProducer>,
next_sequence: Option<u64>,
init_ready: bool,
}

impl TrackState {
fn new(playlist: Url) -> Self {
fn new(playlist: Url, playlist_track: moq_lite::TrackProducer, track_id: String) -> Self {
Self {
playlist,
playlist_track,
track_id,
init_track: None,
next_sequence: None,
init_ready: false,
}
Expand All @@ -140,6 +152,7 @@ impl Hls {
Ok(Self {
broadcast,
catalog,
master_playlist_track: None,
video_importers: Vec::new(),
passthrough,
audio_importer: None,
Expand Down Expand Up @@ -220,15 +233,17 @@ impl Hls {
/// and returns how many segments were written along with the target
/// duration to guide scheduling of the next step.
async fn step(&mut self) -> anyhow::Result<StepOutcome> {
// Fetch master playlist, create tracks for each variant advertised.
self.ensure_tracks().await?;

let mut wrote = 0usize;
let mut target_duration = None;

// @TODO: consolidate audio_tracks/video_tracks into media_tracks
// Ingest a step from all active video variants.
let video_tracks = std::mem::take(&mut self.video);
for (index, mut track) in video_tracks.into_iter().enumerate() {
let playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
let mut playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
// Use the first video's target duration as the base.
if target_duration.is_none() {
target_duration = Some(playlist.target_duration);
Expand All @@ -237,19 +252,27 @@ impl Hls {
.consume_segments(TrackKind::Video(index), &mut track, &playlist, None)
.await?;
wrote += count;

self.rewrite_segment_locations(&mut playlist, &track.track_id);

publish_playlist(track.playlist_track.clone(), Playlist::Media(playlist.clone()));
self.video.push(track);
}

// Ingest from the shared audio track, if present.
if let Some(mut track) = self.audio.take() {
let playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
let mut playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
if target_duration.is_none() {
target_duration = Some(playlist.target_duration);
}
let count = self
.consume_segments(TrackKind::Audio, &mut track, &playlist, None)
.await?;
wrote += count;

self.rewrite_segment_locations(&mut playlist, &track.track_id);

publish_playlist(track.playlist_track.clone(), Playlist::Media(playlist.clone()));
self.audio = Some(track);
}

Expand Down Expand Up @@ -288,42 +311,56 @@ impl Hls {
}

let body = self.fetch_bytes(self.base_url.clone()).await?;
if let Ok((_, master)) = m3u8_rs::parse_master_playlist(&body) {
let variants = select_variants(&master);
anyhow::ensure!(!variants.is_empty(), "no usable variants found in master playlist");

if let Ok((_, mut master)) = m3u8_rs::parse_master_playlist(&body) {
// Create a video track state for every usable variant.
for variant in &variants {
let video_url = resolve_uri(&self.base_url, &variant.uri)?;
self.video.push(TrackState::new(video_url));
for (index, variant) in master.variants.iter_mut().enumerate() {
let media_url = resolve_uri(&self.base_url, &variant.uri)?;
let track_id = format!("{}{}", if variant.resolution.is_some() { "video" } else { "audio" }, index);
variant.uri = format!("{}.m3u8", track_id);

let playlist_track = self.broadcast.create_track(moq_lite::Track::new(format!("{}.m3u8", track_id)));

// @TODO: do we really need separate state for Audio and Video tracks? Can this just be a vec of Media Tracks?
// no reason this couldn't handle subtitles as well.
if track_id.contains("video") {
self.video.push(TrackState::new(media_url, playlist_track, track_id));
}
// @NOTE: ignoring audio tracks because these really should only be advertised as #EXT-X-MEDIA variants
// I think this is just a bug in the output of our `just hls bbb` ingest, "real" hls playlists shouldn't do this
}

// Choose an audio rendition based on the first variant with an audio group.
if let Some(group_id) = variants.iter().find_map(|v| v.audio.as_deref()) {
if let Some(audio_tag) = select_audio(&master, group_id) {
if let Some(uri) = &audio_tag.uri {
let audio_url = resolve_uri(&self.base_url, uri)?;
self.audio = Some(TrackState::new(audio_url));
} else {
warn!(%group_id, "audio rendition missing URI");
}
} else {
warn!(%group_id, "audio group not found in master playlist");
// Audio tracks all live under "alternatives", we'll also handle captions/text tracks here eventually
for (index, alternative) in master.alternatives.iter_mut().enumerate() {
if let Some(uri) = &alternative.uri {
let media_url = resolve_uri(&self.base_url, uri)?;
let track_id = format!("{}{}", &alternative.media_type.to_string().to_lowercase(), index);
alternative.uri = Some(format!("{}.m3u8", track_id));
let playlist_track = self.broadcast.create_track(moq_lite::Track::new(format!("{}.m3u8", track_id)));

// @TODO: push this into a generic media track array
self.audio = Some(TrackState::new(media_url, playlist_track, track_id));
}
}

let audio_url = self.audio.as_ref().map(|a| a.playlist.to_string());
info!(
video_variants = variants.len(),
video_variants = master.variants.len(),
audio = audio_url.as_deref().unwrap_or("none"),
"selected master playlist renditions"
);

self.master_playlist_track = Some(self.broadcast.create_track(moq_lite::Track::new("playlist.m3u8")));
if let Some(ref mut track) = self.master_playlist_track {
publish_playlist(track.clone(), Playlist::Master(master))
}
return Ok(());
}

// Fallback: treat the provided URL as a single media playlist.
self.video.push(TrackState::new(self.base_url.clone()));
self.video.push(TrackState::new(self.base_url.clone(),
self.broadcast.create_track(moq_lite::Track::new("video0.m3u8")),
"video0".to_string()
));
Ok(())
}

Expand Down Expand Up @@ -405,7 +442,20 @@ impl Hls {
let map = self.find_map(playlist).context("playlist missing EXT-X-MAP")?;

let url = resolve_uri(&track.playlist, &map.uri)?;

if track.init_track.is_none() {
let init_track_name = format!("{}.init.mp4", track.track_id);
track.init_track = Some(self.broadcast.create_track(moq_lite::Track::new(init_track_name)));
}

let mut bytes = self.fetch_bytes(url).await?;

// Publish init segment to its track
let init_track = track.init_track.as_mut().expect("init_track was just created");
let mut group = init_track.append_group();
group.write_frame(bytes.clone());
group.close();

let importer = match kind {
TrackKind::Video(index) => self.ensure_video_importer_for(index),
TrackKind::Audio => self.ensure_audio_importer(),
Expand Down Expand Up @@ -507,6 +557,17 @@ impl Hls {
.get_or_insert_with(|| Fmp4::new(self.broadcast.clone(), self.catalog.clone(), Fmp4Config { passthrough }))
}

fn rewrite_segment_locations(&mut self, playlist: &mut MediaPlaylist, track_id: &str) {
let msn = playlist.media_sequence;
// Find and modify the first segment with a map
for (index, segment) in playlist.segments.iter_mut().enumerate() {
if let Some(ref mut map) = segment.map {
map.uri = format!("{}.init.mp4?group=0", track_id); // @TODO: do I need ?group=0 for this?
}
segment.uri = format!("{}.m4s?group={}", track_id, msn + index as u64);
}
}

#[cfg(test)]
fn has_video_importer(&self) -> bool {
!self.video_importers.is_empty()
Expand All @@ -518,107 +579,33 @@ impl Hls {
}
}

fn select_audio<'a>(master: &'a MasterPlaylist, group_id: &str) -> Option<&'a AlternativeMedia> {
let mut first = None;
let mut default = None;

for alternative in master
.alternatives
.iter()
.filter(|alt| alt.media_type == AlternativeMediaType::Audio && alt.group_id == group_id)
{
if first.is_none() {
first = Some(alternative);
}
if alternative.default {
default = Some(alternative);
break;
}
}

default.or(first)
}

fn select_variants(master: &MasterPlaylist) -> Vec<&VariantStream> {
// Helper to extract the first video codec token from the CODECS attribute.
fn first_video_codec(variant: &VariantStream) -> Option<&str> {
let codecs = variant.codecs.as_deref()?;
codecs.split(',').map(|s| s.trim()).find(|s| !s.is_empty())
}

// Map codec strings into a coarse "family" so we can prefer H.264 over others.
fn codec_family(codec: &str) -> Option<&'static str> {
if codec.starts_with("avc1.") || codec.starts_with("avc3.") {
Some("h264")
} else {
None
}
}

// Consider only non-i-frame variants with a URI and a known codec family.
let candidates: Vec<(&VariantStream, &str, &str)> = master
.variants
.iter()
.filter(|variant| !variant.is_i_frame && !variant.uri.is_empty())
.filter_map(|variant| {
let codec = first_video_codec(variant)?;
let family = codec_family(codec)?;
Some((variant, codec, family))
})
.collect();

if candidates.is_empty() {
return Vec::new();
fn resolve_uri(base: &Url, value: &str) -> std::result::Result<Url, url::ParseError> {
if let Ok(url) = Url::parse(value) {
return Ok(url);
}

// Prefer families in this order, falling back to the first available.
const FAMILY_PREFERENCE: &[&str] = &["h264"];

let families_present: Vec<&str> = candidates.iter().map(|(_, _, fam)| *fam).collect();

let target_family = FAMILY_PREFERENCE
.iter()
.find(|fav| families_present.iter().any(|fam| fam == *fav))
.copied()
.unwrap_or(families_present[0]);

// Keep only variants in the chosen family.
let family_variants: Vec<&VariantStream> = candidates
.into_iter()
.filter(|(_, _, fam)| *fam == target_family)
.map(|(variant, _, _)| variant)
.collect();

// Deduplicate by resolution, keeping the lowest-bandwidth variant for each size.
let mut by_resolution: HashMap<Option<Resolution>, &VariantStream> = HashMap::new();
base.join(value)
}

for variant in family_variants {
let key = variant.resolution;
let bandwidth = variant.average_bandwidth.unwrap_or(variant.bandwidth);
#[derive(Debug)]
enum Playlist {
Master(MasterPlaylist),
Media(MediaPlaylist),
}

match by_resolution.entry(key) {
Entry::Vacant(entry) => {
entry.insert(variant);
}
Entry::Occupied(mut entry) => {
let existing = entry.get();
let existing_bw = existing.average_bandwidth.unwrap_or(existing.bandwidth);
if bandwidth < existing_bw {
entry.insert(variant);
}
}
}
}
fn publish_playlist(mut playlist_track: moq_lite::TrackProducer, playlist: Playlist) {
let mut group = playlist_track.append_group();

by_resolution.values().cloned().collect()
}
let mut v: Vec<u8> = Vec::new();

fn resolve_uri(base: &Url, value: &str) -> std::result::Result<Url, url::ParseError> {
if let Ok(url) = Url::parse(value) {
return Ok(url);
match playlist {
Playlist::Master(master) => master.write_to(&mut v).unwrap(),
Playlist::Media(media) => media.write_to(&mut v).unwrap(),
}

base.join(value)
group.write_frame(v);
group.close();
}

#[cfg(test)]
Expand Down
Loading
Loading