Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 78 additions & 56 deletions src/source/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,38 +233,49 @@ impl HangSrc {

self.obj().add_pad(&srcpad).expect("Failed to add pad");

let mut reference = None;

// Push to the srcpad in a background task.
let mut reference = None;
tokio::spawn(async move {
// TODO don't panic on error
while let Some(frame) = track.read().await.expect("failed to read frame") {
let mut buffer = gst::Buffer::from_slice(frame.payload);
let buffer_mut = buffer.get_mut().unwrap();

// Make the timestamps relative to the first frame
let timestamp = if let Some(reference) = reference {
frame.timestamp - reference
} else {
reference = Some(frame.timestamp);
frame.timestamp
};

let pts = gst::ClockTime::from_nseconds(timestamp.as_nanos() as _);
buffer_mut.set_pts(Some(pts));

let mut flags = buffer_mut.flags();
match frame.keyframe {
true => flags.remove(gst::BufferFlags::DELTA_UNIT),
false => flags.insert(gst::BufferFlags::DELTA_UNIT),
};

buffer_mut.set_flags(flags);

gst::info!(CAT, "pushing sample: {:?}", buffer);

if let Err(err) = srcpad.push(buffer) {
gst::warning!(CAT, "Failed to push sample: {:?}", err);
loop {
match track.read().await {
Ok(Some(frame)) => {
let mut buffer = gst::Buffer::from_slice(frame.payload);
let buffer_mut = buffer.get_mut().unwrap();

// Make timestamps relative to the first frame for proper playback
let pts = if let Some(reference_ts) = reference {
let timestamp: std::time::Duration = frame.timestamp - reference_ts;
gst::ClockTime::from_nseconds(timestamp.as_nanos() as _)
} else {
reference = Some(frame.timestamp);
gst::ClockTime::ZERO
};
buffer_mut.set_pts(Some(pts));

let mut flags = buffer_mut.flags();
match frame.keyframe {
true => flags.remove(gst::BufferFlags::DELTA_UNIT),
false => flags.insert(gst::BufferFlags::DELTA_UNIT),
};

buffer_mut.set_flags(flags);

gst::info!(CAT, "pushing sample: {:?}", buffer);

if let Err(err) = srcpad.push(buffer) {
gst::warning!(CAT, "Failed to push sample: {:?}", err);
}
}
Ok(None) => {
// Stream ended normally
gst::info!(CAT, "Stream ended normally");
break;
}
Err(e) => {
// Handle connection errors gracefully
gst::warning!(CAT, "Failed to read frame: {:?}", e);
break;
}
Comment on lines +269 to +278
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Signal downstream on termination: push EOS on end/error.

Currently we only log and break. Downstream may hang waiting. Push EOS before breaking so the pipeline can drain and finish cleanly.

-                        Ok(None) => {
-                            // Stream ended normally
-                            gst::info!(CAT, "Stream ended normally");
-                            break;
-                        }
+                        Ok(None) => {
+                            // Stream ended normally
+                            gst::info!(CAT, "Stream ended normally");
+                            let _ = srcpad.push_event(gst::event::Eos::new());
+                            break;
+                        }
-                        Err(e) => {
-                            // Handle connection errors gracefully
-                            gst::warning!(CAT, "Failed to read frame: {:?}", e);
-                            break;
-                        }
+                        Err(e) => {
+                            // Handle connection errors gracefully
+                            gst::warning!(CAT, "Failed to read frame: {:?}", e);
+                            let _ = srcpad.push_event(gst::event::Eos::new());
+                            break;
+                        }

Also applies to: 371-380

🤖 Prompt for AI Agents
In src/source/imp.rs around lines 270-279, the code just logs and breaks on
Ok(None) (normal stream end) and Err(e) (read error); change both branches to
first push an EOS downstream before breaking so downstream elements can drain
and finish: call the appropriate element/pad/event API to send an EOS event
(e.g., element.get_static_pad(...).push_event(gst::event::Eos::new()) or the
pipeline-level/send_event equivalent), handle/report any push_event error if
needed, then break; apply the same EOS-before-break change to the other
occurrence at lines 371-380.

}
}
});
Expand Down Expand Up @@ -326,34 +337,45 @@ impl HangSrc {

self.obj().add_pad(&srcpad).expect("Failed to add pad");

let mut reference = None;

// Push to the srcpad in a background task.
let mut reference = None;
tokio::spawn(async move {
// TODO don't panic on error
while let Some(frame) = track.read().await.expect("failed to read frame") {
let mut buffer = gst::Buffer::from_slice(frame.payload);
let buffer_mut = buffer.get_mut().unwrap();

// Make the timestamps relative to the first frame
let timestamp = if let Some(reference) = reference {
frame.timestamp - reference
} else {
reference = Some(frame.timestamp);
frame.timestamp
Copy link
Collaborator

@kixelated kixelated Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just needed to be 0 right? The rest of the changes only impact logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct

};

let pts = gst::ClockTime::from_nseconds(timestamp.as_nanos() as _);
buffer_mut.set_pts(Some(pts));

let mut flags = buffer_mut.flags();
flags.remove(gst::BufferFlags::DELTA_UNIT);
buffer_mut.set_flags(flags);

gst::info!(CAT, "pushing sample: {:?}", buffer);

if let Err(err) = srcpad.push(buffer) {
gst::warning!(CAT, "Failed to push sample: {:?}", err);
loop {
match track.read().await {
Ok(Some(frame)) => {
let mut buffer = gst::Buffer::from_slice(frame.payload);
let buffer_mut = buffer.get_mut().unwrap();

// Make timestamps relative to the first frame for proper playback
let pts = if let Some(reference_ts) = reference {
let timestamp: std::time::Duration = frame.timestamp - reference_ts;
gst::ClockTime::from_nseconds(timestamp.as_nanos() as _)
} else {
reference = Some(frame.timestamp);
gst::ClockTime::ZERO
};
buffer_mut.set_pts(Some(pts));

let mut flags = buffer_mut.flags();
flags.remove(gst::BufferFlags::DELTA_UNIT);
buffer_mut.set_flags(flags);
Comment on lines +359 to +361
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that all audio frames should have DELTA_UNIT removed.

Unlike the video track which conditionally sets DELTA_UNIT based on the keyframe flag, the audio track unconditionally removes DELTA_UNIT from all frames. This may be intentional if all audio frames are keyframes, but please verify this is correct for your audio codec.


🏁 Script executed:

#!/bin/bash
# Description: Check if audio frames have keyframe information in the hang crate

# Search for audio frame structures and keyframe handling
rg -nP -A5 -B5 'struct.*Frame|keyframe' --type rust -g '!target/**'

Length of output: 148


🏁 Script executed:

#!/bin/bash
# First, let's examine the context around the audio frame handling in the file
echo "=== Context around lines 359-361 in src/source/imp.rs ==="
cat -n src/source/imp.rs | sed -n '340,380p'

Length of output: 1643


🏁 Script executed:

#!/bin/bash
# Search for DELTA_UNIT usage patterns across Rust files
echo "=== DELTA_UNIT usage patterns ==="
fd -e rs -x rg -n "DELTA_UNIT" {} \;

Length of output: 217


🏁 Script executed:

#!/bin/bash
# Search for audio and video frame handling patterns
echo "=== Audio/Video frame handling ==="
fd -e rs -x rg -nP -C3 "audio.*frame|video.*frame" -i {} \;

Length of output: 34


🏁 Script executed:

#!/bin/bash
# Look for keyframe information in the broader context
echo "=== Keyframe handling ==="
fd -e rs -x rg -nP -C3 "keyframe|key_frame" -i {} \;

Length of output: 296


🏁 Script executed:

#!/bin/bash
# Let's see more context to understand which track is audio vs video
echo "=== Lines 240-270 (video track with keyframe logic) ==="
cat -n src/source/imp.rs | sed -n '240,270p'

Length of output: 1336


🏁 Script executed:

#!/bin/bash
# Let's see what defines the frame type and its fields
echo "=== Frame type definition and imports ==="
rg -nP "struct.*Frame|use.*Frame|impl.*Frame" src/source/imp.rs

Length of output: 41


🏁 Script executed:

#!/bin/bash
# Check for track type identification (audio vs video)
echo "=== Track identification ==="
rg -nP -C5 "audio|video|Audio|Video" src/source/imp.rs | head -100

Length of output: 3321


🏁 Script executed:

#!/bin/bash
# Look for the frame field access patterns
echo "=== Frame field usage ==="
rg -nP "frame\.(keyframe|payload|timestamp)" src/source/imp.rs

Length of output: 451


🏁 Script executed:

#!/bin/bash
# Look for comments around the audio section
echo "=== Audio section with more context (lines 284-365) ==="
cat -n src/source/imp.rs | sed -n '284,365p'

Length of output: 3424


🏁 Script executed:

#!/bin/bash
# Search for hang crate usage and Frame types in Cargo.toml and other files
echo "=== Hang crate dependency ==="
rg -nP "hang\s*=" Cargo.toml

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Check if there are any other references to keyframe in audio context
echo "=== All keyframe references with context ==="
rg -nP -C10 "keyframe" src/source/imp.rs

Length of output: 874


🏁 Script executed:

#!/bin/bash
# Look for Frame type definitions in the hang crate if it's a local dependency
echo "=== Looking for hang crate Frame definitions ==="
fd -t f "mod.rs|lib.rs" | xargs rg -nP "struct Frame|pub struct.*Frame" 2>/dev/null | head -20

Length of output: 48


🌐 Web query:

Rust hang crate version 0.5 Frame type keyframe field

💡 Result:

The Frame struct's keyframe field is a boolean: pub keyframe: bool. It appears as part of Frame: timestamp: Timestamp, keyframe: bool, payload: Bytes. [1][2]

Sources:
[1] docs.rs — hang::Frame (struct fields).
[2] lib.rs crate page for hang (release 0.5 notes & example).


Audio frames should check the keyframe field like video frames do.

The hang::Frame struct includes a keyframe: bool field for all frames (both audio and video). The video track correctly uses this field to conditionally set DELTA_UNIT:

// Video (lines 256-259)
match frame.keyframe {
    true => flags.remove(gst::BufferFlags::DELTA_UNIT),
    false => flags.insert(gst::BufferFlags::DELTA_UNIT),
};

However, the audio track unconditionally removes DELTA_UNIT without checking frame.keyframe. This is inconsistent and potentially incorrect. Apply the same conditional logic:

// Audio (lines 359-361) - should be updated to:
match frame.keyframe {
    true => flags.remove(gst::BufferFlags::DELTA_UNIT),
    false => flags.insert(gst::BufferFlags::DELTA_UNIT),
};
buffer_mut.set_flags(flags);
🤖 Prompt for AI Agents
In src/source/imp.rs around lines 359 to 361, the audio path unconditionally
removes GST DELTA_UNIT flag instead of checking frame.keyframe like the video
path does; update the audio handling to match video by inspecting frame.keyframe
and removing DELTA_UNIT when true or inserting DELTA_UNIT when false, then set
the buffer flags (i.e., replace the unconditional flags.remove call with the
same match on frame.keyframe used for video and call buffer_mut.set_flags(flags)
afterwards).


gst::info!(CAT, "pushing sample: {:?}", buffer);

if let Err(err) = srcpad.push(buffer) {
gst::warning!(CAT, "Failed to push sample: {:?}", err);
}
}
Ok(None) => {
// Stream ended normally
gst::info!(CAT, "Stream ended normally");
break;
}
Err(e) => {
// Handle connection errors gracefully
gst::warning!(CAT, "Failed to read frame: {:?}", e);
break;
}
}
}
});
Expand Down