diff --git a/Cargo.lock b/Cargo.lock index ff669e1..846ab85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,6 +194,15 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "buf-list" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6b175f9cf8fffedd4c4b18bcfef092356e952b81f596e148f18e98280994593" +dependencies = [ + "bytes", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -816,10 +825,12 @@ checksum = "ab2f85be813ce08f0569fcd816f256c6f4287c975d69a9a14ceacc309f1de967" [[package]] name = "hang" -version = "0.6.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e697f84f0dbb448259725abe38c3259b039ab96bd21b0b6cc45ad64530d4dc43" +checksum = "2b4ffdb9c72ec7f06cacea47589d3b2d71452e85cdb780033d99fbb00e46c98f" dependencies = [ + "anyhow", + "buf-list", "bytes", "derive_more", "futures", @@ -828,6 +839,7 @@ dependencies = [ "lazy_static", "moq-lite", "mp4-atom", + "num_enum", "regex", "serde", "serde_json", @@ -842,6 +854,7 @@ name = "hang-gst" version = "0.2.2" dependencies = [ "anyhow", + "bytes", "gst-plugin-version-helper", "gstreamer", "gstreamer-base", @@ -1310,13 +1323,14 @@ dependencies = [ [[package]] name = "moq-lite" -version = "0.8.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0960a60a94712ca946d9deaf0d02e1baa74c519942bd428432935acdc06d626" +checksum = "2c2258f990ddd8465f8dcf343bd00714927cd8b8b81784b153020ca24e47898f" dependencies = [ "async-channel", "bytes", "futures", + "hex", "num_enum", "serde", "thiserror 2.0.17", @@ -1328,9 +1342,9 @@ dependencies = [ [[package]] name = "moq-native" -version = "0.8.4" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "088447cbb8345720ed5ca84d746e2ec0295d24f218daa4bc6e6b5085c239e9e8" +checksum = "0e9c5c481870917a231c5c8d0ad8d293007b12efaf16ddf49c70b833c5f9af3e" dependencies = [ "anyhow", "clap", @@ -1356,9 +1370,9 @@ dependencies = [ [[package]] name = "mp4-atom" -version = "0.8.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a59fd1da0f5d0864ba19bd9d45cefc3dc19835f6f3bbf9016a86e4b4480042" +checksum = "58b9fcf396d53fdf1c43a9afd38953412b9d782d11391807b473927317bb28f9" dependencies = [ "bytes", "derive_more", @@ -2745,21 +2759,22 @@ dependencies = [ [[package]] name = "web-transport-proto" -version = "0.2.8" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "974fa1e325e6cc5327de8887f189a441fcff4f8eedcd31ec87f0ef0cc5283fbc" +checksum = "4b5400535d6dd4c07dc86e83651a838fd513de7f5011d4e4eafa239fa4d0ded4" dependencies = [ "bytes", "http", "thiserror 2.0.17", + "tokio", "url", ] [[package]] name = "web-transport-quinn" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88e34f06fb4cd4760dc9bd348199d1759eed196ebb4ce0aa40b3817069cc58e8" +checksum = "91815d3170c715230c94b5107a71ccf81646513e548ee1408c3ce285d021d6ca" dependencies = [ "bytes", "futures", @@ -2777,9 +2792,9 @@ dependencies = [ [[package]] name = "web-transport-trait" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07665af67c56637c938425911b9b5a4f6aaea45709354e4656b9e2de45768c68" +checksum = "8f4bafa8c6ff708042f67ef8031ca0f342822fd785b70f36a4b2c014760fc442" dependencies = [ "bytes", ] diff --git a/Cargo.toml b/Cargo.toml index d49e83d..06a449c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,12 +19,13 @@ path = "src/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# Using published versions from crates.io instead of workspace -hang = "0.6" -moq-lite = "0.8" -moq-native = "0.8" +# Using local path dependencies from moq-dev +hang = "0.9.0" +moq-lite = "0.10.1" +moq-native = "0.10.1" anyhow = { version = "1", features = ["backtrace"] } +bytes = "1" gst = { package = "gstreamer", version = "0.23" } gst-base = { package = "gstreamer-base", version = "0.23" } #gst-app = { package = "gstreamer-app", version = "0.23", features = ["v1_20"] } diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 27366e0..0c73504 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -1,4 +1,5 @@ use anyhow::Context as _; +use bytes::BytesMut; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -28,7 +29,8 @@ struct Settings { #[derive(Default)] struct State { - pub media: Option, + pub media: Option, + pub buffer: BytesMut, } #[derive(Default)] @@ -140,12 +142,24 @@ impl BaseSinkImpl for HangSink { let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; let mut state = self.state.lock().unwrap(); + + // Append incoming data to our buffer + state.buffer.extend_from_slice(data.as_slice()); + + // Take media out temporarily to avoid borrow conflict let mut media = state.media.take().expect("not initialized"); - // TODO avoid full media parsing? gst should be able to provide the necessary info - media.parse(data.as_slice()).expect("failed to parse"); + // Try to decode what we have buffered + let result = media.decode(&mut state.buffer); + + // Put media back state.media = Some(media); + if let Err(e) = result { + gst::error!(gst::CAT_DEFAULT, "Failed to decode: {}", e); + return Err(gst::FlowError::Error); + } + Ok(gst::FlowSuccess::Ok) } } @@ -180,7 +194,7 @@ impl HangSink { .await .expect("failed to connect"); - let media = hang::cmaf::Import::new(broadcast.producer); + let media = hang::import::Fmp4::new(broadcast.producer.into()); let mut state = self.state.lock().unwrap(); state.media = Some(media); diff --git a/src/source/imp.rs b/src/source/imp.rs index 7f4933f..c68ab9f 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -185,7 +185,7 @@ impl HangSrc { .consume_broadcast(&name) .ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", name))?; - let catalog = broadcast.subscribe_track(&hang::Catalog::default_track()); + let catalog = broadcast.subscribe_track(&hang::catalog::Catalog::default_track()); let mut catalog = hang::catalog::CatalogConsumer::new(catalog); // TODO handle catalog updates @@ -194,7 +194,8 @@ impl HangSrc { if let Some(video) = catalog.video { for (track_name, config) in video.renditions { let track_ref = hang::moq_lite::Track::new(&track_name); - let mut track: hang::TrackConsumer = broadcast.subscribe_track(&track_ref).into(); + let mut track = + hang::TrackConsumer::new(broadcast.subscribe_track(&track_ref), std::time::Duration::from_secs(1)); let caps = match config.codec { hang::catalog::VideoCodec::H264(_) => { @@ -239,14 +240,15 @@ impl HangSrc { let mut reference = None; tokio::spawn(async move { loop { - match track.read().await { + match track.read_frame().await { Ok(Some(frame)) => { - let mut buffer = gst::Buffer::from_slice(frame.payload); + let payload: Vec = frame.payload.into_iter().flatten().collect(); + let mut buffer = gst::Buffer::from_slice(payload); let buffer_mut = buffer.get_mut().unwrap(); // Make timestamps relative to the first frame for proper playback let pts = if let Some(reference_ts) = reference { - let timestamp: std::time::Duration = frame.timestamp - reference_ts; + let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into(); gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) } else { reference = Some(frame.timestamp); @@ -287,7 +289,8 @@ impl HangSrc { if let Some(audio) = catalog.audio { for (track_name, config) in audio.renditions { let track_ref = hang::moq_lite::Track::new(&track_name); - let mut track: hang::TrackConsumer = broadcast.subscribe_track(&track_ref).into(); + let mut track = + hang::TrackConsumer::new(broadcast.subscribe_track(&track_ref), std::time::Duration::from_secs(1)); let caps = match &config.codec { hang::catalog::AudioCodec::AAC(_aac) => { @@ -346,14 +349,15 @@ impl HangSrc { let mut reference = None; tokio::spawn(async move { loop { - match track.read().await { + match track.read_frame().await { Ok(Some(frame)) => { - let mut buffer = gst::Buffer::from_slice(frame.payload); + let payload: Vec = frame.payload.into_iter().flatten().collect(); + let mut buffer = gst::Buffer::from_slice(payload); let buffer_mut = buffer.get_mut().unwrap(); // Make timestamps relative to the first frame for proper playback let pts = if let Some(reference_ts) = reference { - let timestamp: std::time::Duration = frame.timestamp - reference_ts; + let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into(); gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) } else { reference = Some(frame.timestamp);