-
Notifications
You must be signed in to change notification settings - Fork 10
Fix timestamp handling and error recovery in video/audio streaming #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -233,38 +233,49 @@ impl HangSrc { | |
|
|
||
| self.obj().add_pad(&srcpad).expect("Failed to add pad"); | ||
|
|
||
| let mut reference = None; | ||
|
|
||
| // Push to the srcpad in a background task. | ||
| let mut reference = None; | ||
| tokio::spawn(async move { | ||
| // TODO don't panic on error | ||
| while let Some(frame) = track.read().await.expect("failed to read frame") { | ||
| let mut buffer = gst::Buffer::from_slice(frame.payload); | ||
| let buffer_mut = buffer.get_mut().unwrap(); | ||
|
|
||
| // Make the timestamps relative to the first frame | ||
| let timestamp = if let Some(reference) = reference { | ||
| frame.timestamp - reference | ||
| } else { | ||
| reference = Some(frame.timestamp); | ||
| frame.timestamp | ||
| }; | ||
|
|
||
| let pts = gst::ClockTime::from_nseconds(timestamp.as_nanos() as _); | ||
| buffer_mut.set_pts(Some(pts)); | ||
|
|
||
| let mut flags = buffer_mut.flags(); | ||
| match frame.keyframe { | ||
| true => flags.remove(gst::BufferFlags::DELTA_UNIT), | ||
| false => flags.insert(gst::BufferFlags::DELTA_UNIT), | ||
| }; | ||
|
|
||
| buffer_mut.set_flags(flags); | ||
|
|
||
| gst::info!(CAT, "pushing sample: {:?}", buffer); | ||
|
|
||
| if let Err(err) = srcpad.push(buffer) { | ||
| gst::warning!(CAT, "Failed to push sample: {:?}", err); | ||
| loop { | ||
| match track.read().await { | ||
| Ok(Some(frame)) => { | ||
| let mut buffer = gst::Buffer::from_slice(frame.payload); | ||
| let buffer_mut = buffer.get_mut().unwrap(); | ||
|
|
||
| // Make timestamps relative to the first frame for proper playback | ||
| let pts = if let Some(reference_ts) = reference { | ||
| let timestamp: std::time::Duration = frame.timestamp - reference_ts; | ||
| gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) | ||
| } else { | ||
| reference = Some(frame.timestamp); | ||
| gst::ClockTime::ZERO | ||
| }; | ||
| buffer_mut.set_pts(Some(pts)); | ||
|
|
||
| let mut flags = buffer_mut.flags(); | ||
| match frame.keyframe { | ||
| true => flags.remove(gst::BufferFlags::DELTA_UNIT), | ||
| false => flags.insert(gst::BufferFlags::DELTA_UNIT), | ||
| }; | ||
|
|
||
| buffer_mut.set_flags(flags); | ||
|
|
||
| gst::info!(CAT, "pushing sample: {:?}", buffer); | ||
|
|
||
| if let Err(err) = srcpad.push(buffer) { | ||
| gst::warning!(CAT, "Failed to push sample: {:?}", err); | ||
| } | ||
| } | ||
| Ok(None) => { | ||
| // Stream ended normally | ||
| gst::info!(CAT, "Stream ended normally"); | ||
| break; | ||
| } | ||
| Err(e) => { | ||
| // Handle connection errors gracefully | ||
| gst::warning!(CAT, "Failed to read frame: {:?}", e); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
@@ -326,34 +337,45 @@ impl HangSrc { | |
|
|
||
| self.obj().add_pad(&srcpad).expect("Failed to add pad"); | ||
|
|
||
| let mut reference = None; | ||
|
|
||
| // Push to the srcpad in a background task. | ||
| let mut reference = None; | ||
| tokio::spawn(async move { | ||
| // TODO don't panic on error | ||
| while let Some(frame) = track.read().await.expect("failed to read frame") { | ||
| let mut buffer = gst::Buffer::from_slice(frame.payload); | ||
| let buffer_mut = buffer.get_mut().unwrap(); | ||
|
|
||
| // Make the timestamps relative to the first frame | ||
| let timestamp = if let Some(reference) = reference { | ||
| frame.timestamp - reference | ||
| } else { | ||
| reference = Some(frame.timestamp); | ||
| frame.timestamp | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This just needed to be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. correct |
||
| }; | ||
|
|
||
| let pts = gst::ClockTime::from_nseconds(timestamp.as_nanos() as _); | ||
| buffer_mut.set_pts(Some(pts)); | ||
|
|
||
| let mut flags = buffer_mut.flags(); | ||
| flags.remove(gst::BufferFlags::DELTA_UNIT); | ||
| buffer_mut.set_flags(flags); | ||
|
|
||
| gst::info!(CAT, "pushing sample: {:?}", buffer); | ||
|
|
||
| if let Err(err) = srcpad.push(buffer) { | ||
| gst::warning!(CAT, "Failed to push sample: {:?}", err); | ||
| loop { | ||
| match track.read().await { | ||
| Ok(Some(frame)) => { | ||
| let mut buffer = gst::Buffer::from_slice(frame.payload); | ||
| let buffer_mut = buffer.get_mut().unwrap(); | ||
|
|
||
| // Make timestamps relative to the first frame for proper playback | ||
| let pts = if let Some(reference_ts) = reference { | ||
| let timestamp: std::time::Duration = frame.timestamp - reference_ts; | ||
| gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) | ||
| } else { | ||
| reference = Some(frame.timestamp); | ||
| gst::ClockTime::ZERO | ||
| }; | ||
| buffer_mut.set_pts(Some(pts)); | ||
|
|
||
| let mut flags = buffer_mut.flags(); | ||
| flags.remove(gst::BufferFlags::DELTA_UNIT); | ||
| buffer_mut.set_flags(flags); | ||
|
Comment on lines
+359
to
+361
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainVerify that all audio frames should have DELTA_UNIT removed. Unlike the video track which conditionally sets DELTA_UNIT based on the keyframe flag, the audio track unconditionally removes DELTA_UNIT from all frames. This may be intentional if all audio frames are keyframes, but please verify this is correct for your audio codec. 🏁 Script executed: #!/bin/bash
# Description: Check if audio frames have keyframe information in the hang crate
# Search for audio frame structures and keyframe handling
rg -nP -A5 -B5 'struct.*Frame|keyframe' --type rust -g '!target/**'Length of output: 148 🏁 Script executed: #!/bin/bash
# First, let's examine the context around the audio frame handling in the file
echo "=== Context around lines 359-361 in src/source/imp.rs ==="
cat -n src/source/imp.rs | sed -n '340,380p'Length of output: 1643 🏁 Script executed: #!/bin/bash
# Search for DELTA_UNIT usage patterns across Rust files
echo "=== DELTA_UNIT usage patterns ==="
fd -e rs -x rg -n "DELTA_UNIT" {} \;Length of output: 217 🏁 Script executed: #!/bin/bash
# Search for audio and video frame handling patterns
echo "=== Audio/Video frame handling ==="
fd -e rs -x rg -nP -C3 "audio.*frame|video.*frame" -i {} \;Length of output: 34 🏁 Script executed: #!/bin/bash
# Look for keyframe information in the broader context
echo "=== Keyframe handling ==="
fd -e rs -x rg -nP -C3 "keyframe|key_frame" -i {} \;Length of output: 296 🏁 Script executed: #!/bin/bash
# Let's see more context to understand which track is audio vs video
echo "=== Lines 240-270 (video track with keyframe logic) ==="
cat -n src/source/imp.rs | sed -n '240,270p'Length of output: 1336 🏁 Script executed: #!/bin/bash
# Let's see what defines the frame type and its fields
echo "=== Frame type definition and imports ==="
rg -nP "struct.*Frame|use.*Frame|impl.*Frame" src/source/imp.rsLength of output: 41 🏁 Script executed: #!/bin/bash
# Check for track type identification (audio vs video)
echo "=== Track identification ==="
rg -nP -C5 "audio|video|Audio|Video" src/source/imp.rs | head -100Length of output: 3321 🏁 Script executed: #!/bin/bash
# Look for the frame field access patterns
echo "=== Frame field usage ==="
rg -nP "frame\.(keyframe|payload|timestamp)" src/source/imp.rsLength of output: 451 🏁 Script executed: #!/bin/bash
# Look for comments around the audio section
echo "=== Audio section with more context (lines 284-365) ==="
cat -n src/source/imp.rs | sed -n '284,365p'Length of output: 3424 🏁 Script executed: #!/bin/bash
# Search for hang crate usage and Frame types in Cargo.toml and other files
echo "=== Hang crate dependency ==="
rg -nP "hang\s*=" Cargo.tomlLength of output: 45 🏁 Script executed: #!/bin/bash
# Check if there are any other references to keyframe in audio context
echo "=== All keyframe references with context ==="
rg -nP -C10 "keyframe" src/source/imp.rsLength of output: 874 🏁 Script executed: #!/bin/bash
# Look for Frame type definitions in the hang crate if it's a local dependency
echo "=== Looking for hang crate Frame definitions ==="
fd -t f "mod.rs|lib.rs" | xargs rg -nP "struct Frame|pub struct.*Frame" 2>/dev/null | head -20Length of output: 48 🌐 Web query: 💡 Result: The Frame struct's keyframe field is a boolean: Sources: Audio frames should check the keyframe field like video frames do. The // Video (lines 256-259)
match frame.keyframe {
true => flags.remove(gst::BufferFlags::DELTA_UNIT),
false => flags.insert(gst::BufferFlags::DELTA_UNIT),
};However, the audio track unconditionally removes DELTA_UNIT without checking // Audio (lines 359-361) - should be updated to:
match frame.keyframe {
true => flags.remove(gst::BufferFlags::DELTA_UNIT),
false => flags.insert(gst::BufferFlags::DELTA_UNIT),
};
buffer_mut.set_flags(flags);🤖 Prompt for AI Agents |
||
|
|
||
| gst::info!(CAT, "pushing sample: {:?}", buffer); | ||
|
|
||
| if let Err(err) = srcpad.push(buffer) { | ||
| gst::warning!(CAT, "Failed to push sample: {:?}", err); | ||
| } | ||
| } | ||
| Ok(None) => { | ||
| // Stream ended normally | ||
| gst::info!(CAT, "Stream ended normally"); | ||
| break; | ||
| } | ||
| Err(e) => { | ||
| // Handle connection errors gracefully | ||
| gst::warning!(CAT, "Failed to read frame: {:?}", e); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signal downstream on termination: push EOS on end/error.
Currently we only log and break. Downstream may hang waiting. Push EOS before breaking so the pipeline can drain and finish cleanly.
Also applies to: 371-380
🤖 Prompt for AI Agents