Skip to content

Commit c760d05

Browse files
committed
SampleBuilder: Use Jitter Buffer with performance improvements
1 parent dc29db1 commit c760d05

File tree

2 files changed

+48
-23
lines changed

2 files changed

+48
-23
lines changed

pkg/media/samplebuilder/samplebuilder.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"math"
99
"time"
1010

11+
"github.com/pion/interceptor/pkg/jitterbuffer"
1112
"github.com/pion/rtp"
1213
"github.com/pion/webrtc/v4/pkg/media"
1314
)
@@ -16,7 +17,7 @@ import (
1617
type SampleBuilder struct {
1718
maxLate uint16 // how many packets to wait until we get a valid Sample
1819
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
19-
buffer [math.MaxUint16 + 1]*rtp.Packet
20+
buffer *jitterbuffer.JitterBuffer
2021
preparedSamples [math.MaxUint16 + 1]*media.Sample
2122

2223
// Interface that allows us to take RTP packets to samples
@@ -60,7 +61,7 @@ type SampleBuilder struct {
6061
// The depacketizer extracts media samples from RTP packets.
6162
// Several depacketizers are available in package github.com/pion/rtp/codecs.
6263
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
63-
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
64+
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))}
6465
for _, o := range opts {
6566
o(s)
6667
}
@@ -77,7 +78,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
7778
var foundTail *rtp.Packet
7879

7980
for i := location.head; i != location.tail; i++ {
80-
if packet := s.buffer[i]; packet != nil {
81+
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
8182
foundHead = packet
8283

8384
break
@@ -89,7 +90,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
8990
}
9091

9192
for i := location.tail - 1; i != location.head; i-- {
92-
if packet := s.buffer[i]; packet != nil {
93+
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
9394
foundTail = packet
9495

9596
break
@@ -108,7 +109,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
108109
if location.empty() {
109110
return 0, false
110111
}
111-
packet := s.buffer[location.head]
112+
packet, _ := s.buffer.PeekAtSequence(location.head)
112113
if packet == nil {
113114
return 0, false
114115
}
@@ -118,7 +119,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
118119

119120
func (s *SampleBuilder) releasePacket(i uint16) {
120121
var p *rtp.Packet
121-
p, s.buffer[i] = s.buffer[i], nil
122+
p, _ = s.buffer.PopAtSequence(i)
122123
if p != nil && s.packetReleaseHandler != nil {
123124
s.packetReleaseHandler(p)
124125
}
@@ -183,7 +184,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
183184
// Push does not copy the input. If you wish to reuse
184185
// this memory make sure to copy before calling Push.
185186
func (s *SampleBuilder) Push(packet *rtp.Packet) {
186-
s.buffer[packet.SequenceNumber] = packet
187+
s.buffer.Push(packet)
187188

188189
switch s.filled.compare(packet.SequenceNumber) {
189190
case slCompareVoid:
@@ -226,15 +227,19 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
226227

227228
var consume sampleSequenceLocation
228229

229-
for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
230-
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
230+
for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
231+
pkt, err := s.buffer.PeekAtSequence(i)
232+
if pkt == nil || err != nil {
233+
break
234+
}
235+
if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
231236
consume.head = s.active.head
232237
consume.tail = i + 1
233238

234239
break
235240
}
236241
headTimestamp, hasData := s.fetchTimestamp(s.active)
237-
if hasData && s.buffer[i].Timestamp != headTimestamp {
242+
if hasData && pkt.Timestamp != headTimestamp {
238243
consume.head = s.active.head
239244
consume.tail = i
240245

@@ -245,8 +250,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
245250
if consume.empty() {
246251
return nil
247252
}
248-
249-
if !purgingBuffers && s.buffer[consume.tail] == nil {
253+
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
254+
if !purgingBuffers && pkt == nil {
250255
// wait for the next packet after this set of packets to arrive
251256
// to ensure at least one post sample timestamp is known
252257
// (unless we have to release right now)
@@ -258,9 +263,9 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
258263

259264
// scan for any packet after the current and use that time stamp as the diff point
260265
for i := consume.tail; i < s.active.tail; i++ {
261-
if s.buffer[i] != nil {
262-
afterTimestamp = s.buffer[i].Timestamp
263-
266+
pkt, _ := s.buffer.PeekAtSequence(i)
267+
if pkt != nil {
268+
afterTimestamp = pkt.Timestamp
264269
break
265270
}
266271
}
@@ -270,10 +275,12 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
270275

271276
// prior to decoding all the packets, check if this packet
272277
// would end being disposed anyway
273-
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
278+
pkt, err := s.buffer.PeekAtSequence(consume.head)
279+
if pkt != nil && err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
274280
isPadding := false
275281
for i := consume.head; i != consume.tail; i++ {
276-
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
282+
pkt, _ := s.buffer.PeekAtSequence(i)
283+
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
277284
isPadding = true
278285
}
279286
}
@@ -292,15 +299,23 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
292299
var metadata interface{}
293300
var rtpHeaders []*rtp.Header
294301
for i := consume.head; i != consume.tail; i++ {
295-
payload, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
302+
pkt, _ := s.buffer.PeekAtSequence(i)
303+
if pkt == nil {
304+
return nil
305+
}
306+
p, err := s.depacketizer.Unmarshal(pkt.Payload)
296307
if err != nil {
297308
return nil
298309
}
299310
if i == consume.head && s.packetHeadHandler != nil {
300311
metadata = s.packetHeadHandler(s.depacketizer)
301312
}
302313
if s.returnRTPHeaders {
303-
h := s.buffer[i].Header.Clone()
314+
pkt, err := s.buffer.PeekAtSequence(i)
315+
if err != nil {
316+
return nil
317+
}
318+
h := pkt.Header.Clone()
304319
rtpHeaders = append(rtpHeaders, &h)
305320
}
306321

pkg/media/samplebuilder/samplebuilder_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -427,10 +427,20 @@ func TestSampleBuilderCleanReference(t *testing.T) {
427427
fd.Push(pkt5)
428428

429429
for i := 0; i < 3; i++ {
430-
assert.Nilf(
431-
t, fd.buffer[(i+int(seqStart))%0x10000],
432-
"Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i,
433-
)
430+
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))
431+
432+
if pkt != nil || err == nil {
433+
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
434+
}
435+
}
436+
pkt, _ := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))
437+
if pkt != pkt4 {
438+
t.Error("New packet must be referenced after jump")
439+
}
440+
pkt, _ = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))
441+
442+
if pkt != pkt5 {
443+
t.Error("New packet must be referenced after jump")
434444
}
435445
assert.Equal(
436446
t, pkt4, fd.buffer[(14+int(seqStart))%0x10000],

0 commit comments

Comments
 (0)