From 9ca5a843fd8c20ece91ea396824cdcf11d374999 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 18 Mar 2025 11:56:12 -0700 Subject: [PATCH 1/7] Initial attempt at a source. --- .gitignore | 2 +- Cargo.toml | 1 + README.md | 36 +++++-- dev/pub | 48 --------- justfile | 89 ++++++++++++++++ src/lib.rs | 4 + src/sink/imp.rs | 2 +- src/sink/mod.rs | 1 - src/source/imp.rs | 260 ++++++++++++++++++++++++++++++++++++++++++++++ src/source/mod.rs | 16 +++ 10 files changed, 401 insertions(+), 58 deletions(-) delete mode 100755 dev/pub create mode 100644 justfile create mode 100644 src/source/imp.rs create mode 100644 src/source/mod.rs diff --git a/.gitignore b/.gitignore index db2ff9b..835da39 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ target/ *.mp4 *.fmp4 -debug/ \ No newline at end of file +debug/ diff --git a/Cargo.toml b/Cargo.toml index 2a6d4d7..7e576c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ moq-karp = "0.14" gst = { package = "gstreamer", version = "0.23" } gst-base = { package = "gstreamer-base", version = "0.23" } + once_cell = "1" tokio = { version = "1", features = ["full"] } env_logger = "0.9" diff --git a/README.md b/README.md index 8bd58b6..c16a9a1 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,39 @@ +

+ Media over QUIC +

+ A gstreamer plugin utilizing [moq-rs](https://github.com/kixelated/moq-rs). # Usage -Check out the `dev/pub` script for an example pipeline. +## Requirements +- [Rustup](https://www.rust-lang.org/tools/install) +- [Just](https://github.com/casey/just?tab=readme-ov-file#installation) + +## Setup +We use `just` to simplify the development process. +Check out the [Justfile](justfile) or run `just` to see the available commands. -```bash -./dev/pub +Install any other required tools: +```sh +just setup ``` -By default this uses a localhost relay. -You can change the ENV args if you want to make it watchable on production instead: +## Development +First make sure you have a local moq-relay server running: +```sh +# In github.com/kixelated/moq-rs +just relay +``` + +Now you can publish and subscribe to a video: +```sh +# In github.com/kixelated/moq-gst + +# Publish to a localhost moq-relay server +just pub -```bash -ADDR=relay.quic.video NAME=something ./dev/pub +# Subscribe from a localhost moq-relay server +just sub ``` # License diff --git a/dev/pub b/dev/pub deleted file mode 100755 index b4b8b15..0000000 --- a/dev/pub +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash -set -euo pipefail - -# Change directory to the root of the project -cd "$(dirname "$0")/.." - -# Use info logging by default -export RUST_LOG="${RUST_LOG:-info}" - -# Connect to localhost by default. -HOST="${HOST:-localhost}" -PORT="${PORT:-4443}" -ADDR="${ADDR:-$HOST:$PORT}" -SCHEME="${SCHEME:-https}" - -# Use the name "bbb" for the broadcast. -NAME="${NAME:-demo/bbb}" - -# Combine the host into a URL. -URL="${URL:-"$SCHEME://$ADDR/$NAME"}" - -# Make sure we build the gstreamer plugin -cargo build - -export GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" - -# Some useful debugging environment variables -# export GST_DEBUG_DUMP_DOT_DIR="${PWD}/debug" -# export GST_DEBUG=*:5 - -# Download the Big Buck Bunny video if it doesn't exist -if [ ! -f dev/bbb.fmp4 ]; then - if [ ! -f dev/bbb.mp4 ]; then - echo "Downloading ya boye Big Buck Bunny..." - wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O dev/bbb.mp4 - fi - - echo "Converting to a (properly) fragmented MP4..." - ffmpeg -i dev/bbb.mp4 \ - -c copy \ - -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame \ - dev/bbb.fmp4 -fi - -# Run gstreamer and pipe the output to moq-pub -gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux \ - demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink url="$URL" tls-disable-verify=true \ - demux.audio_0 ! aacparse ! queue ! mux. diff --git a/justfile b/justfile new file mode 100644 index 0000000..51b6db7 --- /dev/null +++ b/justfile @@ -0,0 +1,89 @@ +#!/usr/bin/env just --justfile + +# Using Just: https://github.com/casey/just?tab=readme-ov-file#installation + +export RUST_BACKTRACE := "1" +export RUST_LOG := "info" +export URL := "http://localhost:4443" + +# List all of the available commands. +default: + just --list + +# Install any required dependencies. +setup: + # Upgrade Rust + rustup update + + # Make sure the right components are installed. + rustup component add rustfmt clippy + + # Install cargo binstall if needed. + cargo install cargo-binstall + + # Install cargo shear if needed. + cargo binstall --no-confirm cargo-shear + +# Download the video and convert it to a fragmented MP4 that we can stream +download name url: + if [ ! -f dev/{{name}}.mp4 ]; then \ + wget {{url}} -O dev/{{name}}.mp4; \ + fi + + if [ ! -f dev/{{name}}.fmp4 ]; then \ + ffmpeg -i dev/{{name}}.mp4 \ + -c copy \ + -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame \ + dev/{{name}}.fmp4; \ + fi + +# Publish a video using ffmpeg to the localhost relay server +pub: (download "bbb" "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4") + # Build the plugins + cargo build + + # Run gstreamer and pipe the output to our plugin + GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ + gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux \ + demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink url="$URL/demo/bbb" tls-disable-verify=true \ + demux.audio_0 ! aacparse ! queue ! mux. + +# Subscribe to a video using gstreamer +sub: + # Build the plugins + cargo build + + # Run gstreamer and pipe the output to our plugin + # This will render the video to the screen + GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ + gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! autovideosink + +# Run the CI checks +check $RUSTFLAGS="-D warnings": + cargo check --all-targets + cargo clippy --all-targets -- -D warnings + cargo fmt -- --check + cargo shear # requires: cargo binstall cargo-shear + +# Run any CI tests +test $RUSTFLAGS="-D warnings": + cargo test + +# Automatically fix some issues. +fix: + cargo fix --allow-staged --all-targets --all-features + cargo clippy --fix --allow-staged --all-targets --all-features + cargo fmt --all + cargo shear --fix + +# Upgrade any tooling +upgrade: + rustup upgrade + + # Install cargo-upgrades if needed. + cargo install cargo-upgrades cargo-edit + cargo upgrade + +# Build the plugins +build: + cargo build \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 404d694..6f9b3da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,13 @@ use gst::glib; mod sink; +mod source; pub fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + env_logger::init(); sink::register(plugin)?; + source::register(plugin)?; + Ok(()) } diff --git a/src/sink/imp.rs b/src/sink/imp.rs index d0cba01..5b62884 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -94,7 +94,7 @@ impl ElementImpl for MoqSink { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "MoQ Sink", - "Sink", + "Sink/Network/MoQ", "Transmits media over the network via MoQ", "Luke Curley ", ) diff --git a/src/sink/mod.rs b/src/sink/mod.rs index a669e23..17dbc07 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -8,6 +8,5 @@ glib::wrapper! { } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - env_logger::init(); gst::Element::register(Some(plugin), "moqsink", gst::Rank::NONE, MoqSink::static_type()) } diff --git a/src/source/imp.rs b/src/source/imp.rs new file mode 100644 index 0000000..8ea7c87 --- /dev/null +++ b/src/source/imp.rs @@ -0,0 +1,260 @@ +use anyhow::Context as _; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use moq_karp::moq_transfork; + +use moq_native::{quic, tls}; +use once_cell::sync::Lazy; +use std::sync::LazyLock; +use std::sync::Mutex; + +static CAT: Lazy = + Lazy::new(|| gst::DebugCategory::new("moqsrc", gst::DebugColorFlags::empty(), Some("MoQ Source Element"))); + +pub static RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .unwrap() +}); + +#[derive(Default, Clone)] +struct Settings { + pub url: String, + pub tls_disable_verify: bool, +} + +#[derive(Default)] +pub(crate) struct MoqSrcPad {} + +#[glib::object_subclass] +impl ObjectSubclass for MoqSrcPad { + const NAME: &'static str = "MoqSrcPad"; + type Type = super::MoqSrcPad; + type ParentType = gst::GhostPad; +} + +impl ObjectImpl for MoqSrcPad {} +impl GstObjectImpl for MoqSrcPad {} +impl PadImpl for MoqSrcPad {} +impl ProxyPadImpl for MoqSrcPad {} +impl GhostPadImpl for MoqSrcPad {} + +#[derive(Default)] +pub struct MoqSrc { + settings: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for MoqSrc { + const NAME: &'static str = "MoqSrc"; + type Type = super::MoqSrc; + type ParentType = gst::Bin; + type Interfaces = (gst::ChildProxy,); + + fn new() -> Self { + Self::default() + } +} + +impl GstObjectImpl for MoqSrc {} +impl BinImpl for MoqSrc {} + +impl ObjectImpl for MoqSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("url") + .nick("Source URL") + .blurb("Connect to the given URL") + .build(), + glib::ParamSpecBoolean::builder("tls-disable-verify") + .nick("TLS disable verify") + .blurb("Disable TLS verification") + .default_value(false) + .build(), + ] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + + match pspec.name() { + "url" => settings.url = value.get().unwrap(), + "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + + match pspec.name() { + "url" => settings.url.to_value(), + "tls-disable-verify" => settings.tls_disable_verify.to_value(), + _ => unimplemented!(), + } + } +} + +impl ElementImpl for MoqSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "MoQ Src", + "Source/Network/MoQ", + "Receives media over the network via MoQ", + "Luke Curley ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + // Caps are restricted by the cmafmux element negotiation inside our bin element + let sink_pad_template = gst::PadTemplate::with_gtype( + "src_%u", + gst::PadDirection::Src, + gst::PadPresence::Request, + &gst::Caps::new_any(), + super::MoqSrcPad::static_type(), + ) + .unwrap(); + + vec![sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state(&self, transition: gst::StateChange) -> Result { + match transition { + gst::StateChange::ReadyToPaused => { + if let Err(e) = self.setup() { + gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); + return Err(gst::StateChangeError); + } + } + + gst::StateChange::PausedToReady => { + // Cleanup publisher + self.cleanup(); + } + + _ => (), + } + + // Chain up + self.parent_change_state(transition) + } +} + +impl ChildProxyImpl for MoqSrc { + fn children_count(&self) -> u32 { + let object = self.obj(); + object.num_pads() as u32 + } + + fn child_by_name(&self, name: &str) -> Option { + let object = self.obj(); + object.pads().into_iter().find(|p| p.name() == name).map(|p| p.upcast()) + } + + fn child_by_index(&self, index: u32) -> Option { + let object = self.obj(); + object.pads().into_iter().nth(index as usize).map(|p| p.upcast()) + } +} + +impl MoqSrc { + fn setup(&self) -> anyhow::Result<()> { + let settings = self.settings.lock().unwrap(); + let url = url::Url::parse(&settings.url)?; + let path = url.path().to_string(); + + // TODO support TLS certs and other options + let config = quic::Args { + bind: "[::]:0".parse().unwrap(), + tls: tls::Args { + disable_verify: settings.tls_disable_verify, + ..Default::default() + }, + } + .load()?; + drop(settings); + + let client = quic::Endpoint::new(config)?.client; + + let (broadcast, catalog) = RUNTIME.block_on(async { + let session = client.connect(url).await?; + let session = moq_transfork::Session::connect(session).await?; + let mut broadcast = moq_karp::BroadcastConsumer::new(session, path); + + // TODO handle catalog updates + let catalog = broadcast.next_catalog().await?.context("no catalog found")?.clone(); + + Ok::<_, anyhow::Error>((broadcast, catalog)) + })?; + + for video in catalog.video { + let mut track = broadcast.track(&video.track)?; + + let caps = gst::Caps::builder("video") + .field("codec", video.codec.to_string()) + .field("width", video.resolution.width) + .field("height", video.resolution.height) + .build(); + + let appsrc = gst::ElementFactory::make("appsrc") + .name(&video.track.name) + .build() + .unwrap(); + + appsrc.set_property("format", gst::Format::Time); + appsrc.set_property("caps", &caps); + appsrc.set_property("blocksize", 4096u32); + + self.obj().add(&appsrc)?; + + let srcpad = appsrc.static_pad("src").unwrap(); + let ghostpad = gst::GhostPad::with_target(&srcpad).unwrap(); + ghostpad.set_active(true)?; + self.obj().add_pad(&ghostpad)?; + + tokio::spawn(async move { + // TODO don't panic on error + while let Some(frame) = track.read().await.expect("failed to read frame") { + // TODO + let buffer = gst::Buffer::from_slice(frame.payload); + /* + buffer.set_pts(Some(gst::ClockTime::from_nseconds(frame.timestamp.as_nanos() as _))); + + let flags = buffer.flags(); + if frame.keyframe { + buffer.set_flags(flags | gst::BufferFlags::MARKER); + } else { + buffer.set_flags(flags & !gst::BufferFlags::DELTA_UNIT); + } + */ + + appsrc.emit_by_name::<()>("push-buffer", &[&buffer]); + } + }); + } + + for audio in catalog.audio {} + + Ok(()) + } + + fn cleanup(&self) { + // TODO kill spawned tasks + } +} diff --git a/src/source/mod.rs b/src/source/mod.rs new file mode 100644 index 0000000..796eddc --- /dev/null +++ b/src/source/mod.rs @@ -0,0 +1,16 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct MoqSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy; +} + +glib::wrapper! { + pub(crate) struct MoqSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register(Some(plugin), "moqsrc", gst::Rank::NONE, MoqSrc::static_type()) +} From 4e5ef1e8f9f4fd748992fe92dcafdb9aeafa33ef Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 19 Mar 2025 09:52:07 -0700 Subject: [PATCH 2/7] WIP --- Cargo.lock | 29 ++++++++++++ Cargo.toml | 1 + justfile | 5 ++- src/source/imp.rs | 109 +++++++++++++++++++++++++++++----------------- 4 files changed, 102 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7fdd36..4bf2db4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -736,6 +736,34 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "gstreamer-app" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9a883eb21aebcf1289158225c05f7aea5da6ecf71fa7f0ff1ce4d25baf004e" +dependencies = [ + "futures-core", + "futures-sink", + "glib", + "gstreamer", + "gstreamer-app-sys", + "gstreamer-base", + "libc", +] + +[[package]] +name = "gstreamer-app-sys" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f7ef838306fe51852d503a14dc79ac42de005a59008a05098de3ecdaf05455" +dependencies = [ + "glib-sys", + "gstreamer-base-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + [[package]] name = "gstreamer-base" version = "0.23.5" @@ -1289,6 +1317,7 @@ dependencies = [ "env_logger", "gst-plugin-version-helper", "gstreamer", + "gstreamer-app", "gstreamer-base", "moq-karp", "moq-native", diff --git a/Cargo.toml b/Cargo.toml index 7e576c0..8510b0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ moq-karp = "0.14" gst = { package = "gstreamer", version = "0.23" } gst-base = { package = "gstreamer-base", version = "0.23" } +gst-app = { package = "gstreamer-app", version = "0.23" } once_cell = "1" tokio = { version = "1", features = ["full"] } diff --git a/justfile b/justfile index 51b6db7..e189723 100644 --- a/justfile +++ b/justfile @@ -5,6 +5,7 @@ export RUST_BACKTRACE := "1" export RUST_LOG := "info" export URL := "http://localhost:4443" +export GST_DEBUG := "*:4" # List all of the available commands. default: @@ -56,7 +57,7 @@ sub: # Run gstreamer and pipe the output to our plugin # This will render the video to the screen GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ - gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! autovideosink + gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! fakesink # Run the CI checks check $RUSTFLAGS="-D warnings": @@ -86,4 +87,4 @@ upgrade: # Build the plugins build: - cargo build \ No newline at end of file + cargo build diff --git a/src/source/imp.rs b/src/source/imp.rs index 8ea7c87..5b89c9b 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -118,17 +118,15 @@ impl ElementImpl for MoqSrc { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - // Caps are restricted by the cmafmux element negotiation inside our bin element - let sink_pad_template = gst::PadTemplate::with_gtype( - "src_%u", + let pad = gst::PadTemplate::new( + "src", gst::PadDirection::Src, - gst::PadPresence::Request, + gst::PadPresence::Sometimes, &gst::Caps::new_any(), - super::MoqSrcPad::static_type(), ) .unwrap(); - vec![sink_pad_template] + vec![pad] }); PAD_TEMPLATES.as_ref() @@ -137,10 +135,13 @@ impl ElementImpl for MoqSrc { fn change_state(&self, transition: gst::StateChange) -> Result { match transition { gst::StateChange::ReadyToPaused => { - if let Err(e) = self.setup() { + if let Err(e) = RUNTIME.block_on(self.setup()) { gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); return Err(gst::StateChangeError); } + + // We downloaded the catalog and created all the pads. + self.obj().no_more_pads(); } gst::StateChange::PausedToReady => { @@ -174,7 +175,7 @@ impl ChildProxyImpl for MoqSrc { } impl MoqSrc { - fn setup(&self) -> anyhow::Result<()> { + async fn setup(&self) -> anyhow::Result<()> { let settings = self.settings.lock().unwrap(); let url = url::Url::parse(&settings.url)?; let path = url.path().to_string(); @@ -192,60 +193,88 @@ impl MoqSrc { let client = quic::Endpoint::new(config)?.client; - let (broadcast, catalog) = RUNTIME.block_on(async { - let session = client.connect(url).await?; - let session = moq_transfork::Session::connect(session).await?; - let mut broadcast = moq_karp::BroadcastConsumer::new(session, path); + let session = client.connect(url).await?; + let session = moq_transfork::Session::connect(session).await?; + let mut broadcast = moq_karp::BroadcastConsumer::new(session, path); - // TODO handle catalog updates - let catalog = broadcast.next_catalog().await?.context("no catalog found")?.clone(); + // TODO handle catalog updates + let catalog = broadcast.next_catalog().await?.context("no catalog found")?.clone(); - Ok::<_, anyhow::Error>((broadcast, catalog)) - })?; + gst::info!(CAT, "catalog: {:?}", catalog); for video in catalog.video { let mut track = broadcast.track(&video.track)?; - let caps = gst::Caps::builder("video") - .field("codec", video.codec.to_string()) - .field("width", video.resolution.width) - .field("height", video.resolution.height) - .build(); + let caps = match video.codec { + moq_karp::VideoCodec::H264(_) => { + let builder = gst::Caps::builder("video/x-h264") + .field("width", video.resolution.width) + .field("height", video.resolution.height) + .field("alignment", "au"); + + if let Some(description) = video.description { + builder + .field("stream-format", "avc") + .field("codec_data", gst::Buffer::from_slice(description.clone())) + .build() + } else { + builder.field("stream-format", "annexb").build() + } + } + _ => unimplemented!(), + }; - let appsrc = gst::ElementFactory::make("appsrc") + let appsrc = gst_app::AppSrc::builder() .name(&video.track.name) - .build() - .unwrap(); + .caps(&caps) + .format(gst::Format::Time) + .is_live(true) + .stream_type(gst_app::AppStreamType::Stream) + .build(); - appsrc.set_property("format", gst::Format::Time); - appsrc.set_property("caps", &caps); - appsrc.set_property("blocksize", 4096u32); + let appsrc_pad = appsrc.static_pad("src").unwrap(); + gst::info!(CAT, "AppSrc linked to: {:?}", appsrc_pad.peer()); - self.obj().add(&appsrc)?; + let srcpad = gst::GhostPad::with_target(&appsrc_pad).unwrap(); + srcpad.set_active(true)?; + self.obj().add_pad(&srcpad)?; - let srcpad = appsrc.static_pad("src").unwrap(); - let ghostpad = gst::GhostPad::with_target(&srcpad).unwrap(); - ghostpad.set_active(true)?; - self.obj().add_pad(&ghostpad)?; + self.obj().add(&appsrc)?; tokio::spawn(async move { // TODO don't panic on error while let Some(frame) = track.read().await.expect("failed to read frame") { // TODO - let buffer = gst::Buffer::from_slice(frame.payload); - /* - buffer.set_pts(Some(gst::ClockTime::from_nseconds(frame.timestamp.as_nanos() as _))); + let mut buffer = gst::Buffer::from_slice(frame.payload); + let buffer_mut = buffer.get_mut().unwrap(); + + let pts = gst::ClockTime::from_nseconds(frame.timestamp.as_nanos() as _); + buffer_mut.set_pts(Some(pts)); - let flags = buffer.flags(); + let mut flags = buffer_mut.flags(); if frame.keyframe { - buffer.set_flags(flags | gst::BufferFlags::MARKER); + flags.insert(gst::BufferFlags::MARKER); } else { - buffer.set_flags(flags & !gst::BufferFlags::DELTA_UNIT); + flags.insert(gst::BufferFlags::DELTA_UNIT); + } + buffer_mut.set_flags(flags); + + // Ensure appsrc has caps set + if appsrc.caps().is_none() { + gst::error!(CAT, "AppSrc missing caps!"); + break; } - */ - appsrc.emit_by_name::<()>("push-buffer", &[&buffer]); + let sample = gst::Sample::builder() + .buffer(&buffer) + .caps(&appsrc.caps().unwrap()) + .build(); + if let Err(err) = appsrc.push_sample(&sample) { + gst::warning!(CAT, "Failed to push sample: {:?}", err); + } } + + appsrc.end_of_stream().unwrap(); }); } From 2ffab8b1aadcf979c48414e4dd077e6bad1dee35 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 19 Mar 2025 15:51:20 -0700 Subject: [PATCH 3/7] Trying my best. --- justfile | 4 ++-- src/source/imp.rs | 37 ++++++++++++------------------------- src/source/mod.rs | 2 +- 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/justfile b/justfile index e189723..bc04c09 100644 --- a/justfile +++ b/justfile @@ -5,7 +5,7 @@ export RUST_BACKTRACE := "1" export RUST_LOG := "info" export URL := "http://localhost:4443" -export GST_DEBUG := "*:4" +export GST_DEBUG:="*:3" # List all of the available commands. default: @@ -57,7 +57,7 @@ sub: # Run gstreamer and pipe the output to our plugin # This will render the video to the screen GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ - gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! fakesink + gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! queue ! h264parse ! mp4mux ! filesink location=output.mp4 # Run the CI checks check $RUSTFLAGS="-D warnings": diff --git a/src/source/imp.rs b/src/source/imp.rs index 5b89c9b..5d7b007 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -53,7 +53,6 @@ impl ObjectSubclass for MoqSrc { const NAME: &'static str = "MoqSrc"; type Type = super::MoqSrc; type ParentType = gst::Bin; - type Interfaces = (gst::ChildProxy,); fn new() -> Self { Self::default() @@ -119,7 +118,7 @@ impl ElementImpl for MoqSrc { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { let pad = gst::PadTemplate::new( - "src", + "src_%u", gst::PadDirection::Src, gst::PadPresence::Sometimes, &gst::Caps::new_any(), @@ -139,9 +138,6 @@ impl ElementImpl for MoqSrc { gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); return Err(gst::StateChangeError); } - - // We downloaded the catalog and created all the pads. - self.obj().no_more_pads(); } gst::StateChange::PausedToReady => { @@ -157,23 +153,6 @@ impl ElementImpl for MoqSrc { } } -impl ChildProxyImpl for MoqSrc { - fn children_count(&self) -> u32 { - let object = self.obj(); - object.num_pads() as u32 - } - - fn child_by_name(&self, name: &str) -> Option { - let object = self.obj(); - object.pads().into_iter().find(|p| p.name() == name).map(|p| p.upcast()) - } - - fn child_by_index(&self, index: u32) -> Option { - let object = self.obj(); - object.pads().into_iter().nth(index as usize).map(|p| p.upcast()) - } -} - impl MoqSrc { async fn setup(&self) -> anyhow::Result<()> { let settings = self.settings.lock().unwrap(); @@ -230,15 +209,20 @@ impl MoqSrc { .format(gst::Format::Time) .is_live(true) .stream_type(gst_app::AppStreamType::Stream) + .do_timestamp(true) .build(); let appsrc_pad = appsrc.static_pad("src").unwrap(); - gst::info!(CAT, "AppSrc linked to: {:?}", appsrc_pad.peer()); - let srcpad = gst::GhostPad::with_target(&appsrc_pad).unwrap(); + let templ = self.obj().pad_template("src_%u").unwrap(); + let srcpad = gst::GhostPad::builder_from_template(&templ) + .name(format!("src_{}", 0)) + .build(); + + srcpad.set_target(Some(&appsrc_pad))?; srcpad.set_active(true)?; - self.obj().add_pad(&srcpad)?; + self.obj().add_pad(&srcpad)?; self.obj().add(&appsrc)?; tokio::spawn(async move { @@ -280,6 +264,9 @@ impl MoqSrc { for audio in catalog.audio {} + // We downloaded the catalog and created all the pads. + self.obj().no_more_pads(); + Ok(()) } diff --git a/src/source/mod.rs b/src/source/mod.rs index 796eddc..2f4d42b 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -4,7 +4,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct MoqSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy; + pub struct MoqSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; } glib::wrapper! { From 7c9c174e6f77eb481f5d7b34371a7fe643b1039d Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 26 Mar 2025 08:24:31 -0700 Subject: [PATCH 4/7] WIP --- Cargo.toml | 2 +- justfile | 4 +-- src/sink/imp.rs | 2 +- src/source/imp.rs | 92 +++++++++++++++++++---------------------------- src/source/mod.rs | 4 --- 5 files changed, 40 insertions(+), 64 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8510b0a..9a69b09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ moq-karp = "0.14" gst = { package = "gstreamer", version = "0.23" } gst-base = { package = "gstreamer-base", version = "0.23" } -gst-app = { package = "gstreamer-app", version = "0.23" } +gst-app = { package = "gstreamer-app", version = "0.23", features = ["v1_20"] } once_cell = "1" tokio = { version = "1", features = ["full"] } diff --git a/justfile b/justfile index bc04c09..8661d13 100644 --- a/justfile +++ b/justfile @@ -5,7 +5,7 @@ export RUST_BACKTRACE := "1" export RUST_LOG := "info" export URL := "http://localhost:4443" -export GST_DEBUG:="*:3" +export GST_DEBUG:="*:4" # List all of the available commands. default: @@ -57,7 +57,7 @@ sub: # Run gstreamer and pipe the output to our plugin # This will render the video to the screen GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ - gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! queue ! h264parse ! mp4mux ! filesink location=output.mp4 + gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! decodebin ! videoconvert ! autovideosink #mp4mux fragment-duration=100 streamable=true ! filesink location=output.mp4 # Run the CI checks check $RUSTFLAGS="-D warnings": diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 5b62884..c377c4d 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -167,7 +167,7 @@ impl MoqSink { .await .expect("failed to connect"); - let path = url.path().to_string(); + let path = url.path().strip_prefix("/").unwrap().to_string(); let broadcast = moq_karp::BroadcastProducer::new(session, path).unwrap(); let media = moq_karp::cmaf::Import::new(broadcast); diff --git a/src/source/imp.rs b/src/source/imp.rs index 5d7b007..1708e89 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -27,22 +27,6 @@ struct Settings { pub tls_disable_verify: bool, } -#[derive(Default)] -pub(crate) struct MoqSrcPad {} - -#[glib::object_subclass] -impl ObjectSubclass for MoqSrcPad { - const NAME: &'static str = "MoqSrcPad"; - type Type = super::MoqSrcPad; - type ParentType = gst::GhostPad; -} - -impl ObjectImpl for MoqSrcPad {} -impl GstObjectImpl for MoqSrcPad {} -impl PadImpl for MoqSrcPad {} -impl ProxyPadImpl for MoqSrcPad {} -impl GhostPadImpl for MoqSrcPad {} - #[derive(Default)] pub struct MoqSrc { settings: Mutex, @@ -117,15 +101,23 @@ impl ElementImpl for MoqSrc { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - let pad = gst::PadTemplate::new( - "src_%u", + let video = gst::PadTemplate::new( + "video_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + + let audio = gst::PadTemplate::new( + "audio_%u", gst::PadDirection::Src, gst::PadPresence::Sometimes, &gst::Caps::new_any(), ) .unwrap(); - vec![pad] + vec![video, audio] }); PAD_TEMPLATES.as_ref() @@ -157,7 +149,7 @@ impl MoqSrc { async fn setup(&self) -> anyhow::Result<()> { let settings = self.settings.lock().unwrap(); let url = url::Url::parse(&settings.url)?; - let path = url.path().to_string(); + let path = url.path().strip_prefix("/").unwrap().to_string(); // TODO support TLS certs and other options let config = quic::Args { @@ -187,8 +179,8 @@ impl MoqSrc { let caps = match video.codec { moq_karp::VideoCodec::H264(_) => { let builder = gst::Caps::builder("video/x-h264") - .field("width", video.resolution.width) - .field("height", video.resolution.height) + //.field("width", video.resolution.width) + //.field("height", video.resolution.height) .field("alignment", "au"); if let Some(description) = video.description { @@ -203,32 +195,30 @@ impl MoqSrc { _ => unimplemented!(), }; - let appsrc = gst_app::AppSrc::builder() - .name(&video.track.name) - .caps(&caps) - .format(gst::Format::Time) - .is_live(true) - .stream_type(gst_app::AppStreamType::Stream) - .do_timestamp(true) - .build(); + gst::info!(CAT, "caps: {:?}", caps); - let appsrc_pad = appsrc.static_pad("src").unwrap(); + let templ = self.obj().element_class().pad_template("video_%u").unwrap(); - let templ = self.obj().pad_template("src_%u").unwrap(); - let srcpad = gst::GhostPad::builder_from_template(&templ) - .name(format!("src_{}", 0)) + let srcpad = gst::Pad::builder_from_template(&templ).name(&video.track.name).build(); + srcpad.set_active(true).unwrap(); + + let stream_start = gst::event::StreamStart::builder(&video.track.name) + .group_id(gst::GroupId::next()) .build(); + srcpad.push_event(stream_start); + + let caps_evt = gst::event::Caps::new(&caps); + srcpad.push_event(caps_evt); - srcpad.set_target(Some(&appsrc_pad))?; - srcpad.set_active(true)?; + let segment = gst::event::Segment::new(&gst::FormattedSegment::::new()); + srcpad.push_event(segment); - self.obj().add_pad(&srcpad)?; - self.obj().add(&appsrc)?; + self.obj().add_pad(&srcpad).expect("Failed to add pad"); + // Push to the srcpad in a background task. tokio::spawn(async move { // TODO don't panic on error while let Some(frame) = track.read().await.expect("failed to read frame") { - // TODO let mut buffer = gst::Buffer::from_slice(frame.payload); let buffer_mut = buffer.get_mut().unwrap(); @@ -236,29 +226,19 @@ impl MoqSrc { buffer_mut.set_pts(Some(pts)); let mut flags = buffer_mut.flags(); - if frame.keyframe { - flags.insert(gst::BufferFlags::MARKER); - } else { - flags.insert(gst::BufferFlags::DELTA_UNIT); - } + match frame.keyframe { + true => flags.remove(gst::BufferFlags::DELTA_UNIT), + false => flags.insert(gst::BufferFlags::DELTA_UNIT), + }; + buffer_mut.set_flags(flags); - // Ensure appsrc has caps set - if appsrc.caps().is_none() { - gst::error!(CAT, "AppSrc missing caps!"); - break; - } + gst::info!(CAT, "pushing sample: {:?}", buffer); - let sample = gst::Sample::builder() - .buffer(&buffer) - .caps(&appsrc.caps().unwrap()) - .build(); - if let Err(err) = appsrc.push_sample(&sample) { + if let Err(err) = srcpad.push(buffer) { gst::warning!(CAT, "Failed to push sample: {:?}", err); } } - - appsrc.end_of_stream().unwrap(); }); } diff --git a/src/source/mod.rs b/src/source/mod.rs index 2f4d42b..b41daca 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -7,10 +7,6 @@ glib::wrapper! { pub struct MoqSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; } -glib::wrapper! { - pub(crate) struct MoqSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; -} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register(Some(plugin), "moqsrc", gst::Rank::NONE, MoqSrc::static_type()) } From e1646d376cbbfb1086684be0b68e02ff23c79fae Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 27 Mar 2025 16:16:49 -0700 Subject: [PATCH 5/7] Basic src plugin. --- justfile | 2 +- src/source/imp.rs | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/justfile b/justfile index 8661d13..213a099 100644 --- a/justfile +++ b/justfile @@ -5,7 +5,7 @@ export RUST_BACKTRACE := "1" export RUST_LOG := "info" export URL := "http://localhost:4443" -export GST_DEBUG:="*:4" +#export GST_DEBUG:="*:4" # List all of the available commands. default: diff --git a/src/source/imp.rs b/src/source/imp.rs index 1708e89..da15c56 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -215,6 +215,8 @@ impl MoqSrc { self.obj().add_pad(&srcpad).expect("Failed to add pad"); + let mut reference = None; + // Push to the srcpad in a background task. tokio::spawn(async move { // TODO don't panic on error @@ -222,7 +224,15 @@ impl MoqSrc { let mut buffer = gst::Buffer::from_slice(frame.payload); let buffer_mut = buffer.get_mut().unwrap(); - let pts = gst::ClockTime::from_nseconds(frame.timestamp.as_nanos() as _); + // Make the timestamps relative to the first frame + let timestamp = if let Some(reference) = reference { + frame.timestamp - reference + } else { + reference = Some(frame.timestamp); + frame.timestamp + }; + + let pts = gst::ClockTime::from_nseconds(timestamp.as_nanos() as _); buffer_mut.set_pts(Some(pts)); let mut flags = buffer_mut.flags(); From b2b15e9026180385e3e0612f6cbe4cbf5bac58c6 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 27 Mar 2025 16:18:21 -0700 Subject: [PATCH 6/7] Only render. --- justfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/justfile b/justfile index 213a099..300845b 100644 --- a/justfile +++ b/justfile @@ -57,7 +57,7 @@ sub: # Run gstreamer and pipe the output to our plugin # This will render the video to the screen GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ - gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! decodebin ! videoconvert ! autovideosink #mp4mux fragment-duration=100 streamable=true ! filesink location=output.mp4 + gst-launch-1.0 -v -e moqsrc url="$URL/demo/bbb" tls-disable-verify=true ! decodebin ! videoconvert ! autovideosink # Run the CI checks check $RUSTFLAGS="-D warnings": From 14f0e0176705f9d6119ff697fce1720e34eab4b2 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 27 Mar 2025 16:19:56 -0700 Subject: [PATCH 7/7] Don't hold the mutex over an await. --- src/source/imp.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/source/imp.rs b/src/source/imp.rs index da15c56..eaf9c53 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -147,22 +147,25 @@ impl ElementImpl for MoqSrc { impl MoqSrc { async fn setup(&self) -> anyhow::Result<()> { - let settings = self.settings.lock().unwrap(); - let url = url::Url::parse(&settings.url)?; - let path = url.path().strip_prefix("/").unwrap().to_string(); - - // TODO support TLS certs and other options - let config = quic::Args { - bind: "[::]:0".parse().unwrap(), - tls: tls::Args { - disable_verify: settings.tls_disable_verify, - ..Default::default() - }, - } - .load()?; - drop(settings); + let (quic, url, path) = { + let settings = self.settings.lock().unwrap(); + let url = url::Url::parse(&settings.url)?; + let path = url.path().strip_prefix("/").unwrap().to_string(); + + // TODO support TLS certs and other options + let quic = quic::Args { + bind: "[::]:0".parse().unwrap(), + tls: tls::Args { + disable_verify: settings.tls_disable_verify, + ..Default::default() + }, + }; + + (quic, url, path) + }; - let client = quic::Endpoint::new(config)?.client; + let quic = quic.load()?; + let client = quic::Endpoint::new(quic)?.client; let session = client.connect(url).await?; let session = moq_transfork::Session::connect(session).await?;