Skip to content

Commit 3b75e53

Browse files
authored
feat(llc): video pause feature (#1063)
* Participant source added * changelog * changelog tweak * tweaks * source exported * update protocol model version * added missing participant source when reconnecting * video pause feature * fixes * added docs
1 parent 223a634 commit 3b75e53

File tree

19 files changed

+247
-7
lines changed

19 files changed

+247
-7
lines changed

packages/stream_video/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
✅ Added
44
* Extended `CallParticipantState` with `participantSource`. This indicates the participant's source channel (e.g., WebRTC, RTMP, WHIP) and can be used in filtering and sorting criteria.
55
* Livestream sorting preset now prioritises RTMP sources in layout sorting.
6+
* Automatic SFU-driven pausing of inbound video to save bandwidth and prevent visual artifacts:
7+
* New `SfuClientCapability.subscriberVideoPause` (on by default). Control via `Call.enableClientCapabilities()` / `Call.disableClientCapabilities()`.
8+
* New `SfuInboundStateNotificationEvent` notifies when inbound tracks are paused or resumed.
9+
* `CallParticipantState.pausedTracks` and `CallParticipantState.isTrackPaused()` let you check which tracks are currently paused.
610
* Added capability to remove a participant from a call via `call.kickUser()`. Requires the `kick-user` permission.
711

812
🐞 Fixed

packages/stream_video/lib/src/call/call.dart

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import '../models/models.dart';
2929
import '../platform_detector/platform_detector.dart';
3030
import '../retry/retry_policy.dart';
3131
import '../sfu/data/events/sfu_events.dart';
32+
import '../sfu/data/models/sfu_client_capability.dart';
3233
import '../sfu/data/models/sfu_error.dart';
3334
import '../sfu/data/models/sfu_track_type.dart';
3435
import '../shared_emitter.dart';
@@ -292,6 +293,9 @@ class Call {
292293
final Map<String, Timer> _reactionTimers = {};
293294
final Map<String, Timer> _captionsTimers = {};
294295
final List<CancelableOperation<void>> _sfuStatsTimers = [];
296+
final Set<SfuClientCapability> _sfuClientCapabilities = {
297+
SfuClientCapability.subscriberVideoPause, // on by default
298+
};
295299

296300
String get id => state.value.callId;
297301
StreamCallCid get callCid => state.value.callCid;
@@ -550,6 +554,28 @@ class Call {
550554
_stateManager.updateCallPreferences(preferences);
551555
}
552556

557+
/// Enables the given SFU client capabilities for this call.
558+
///
559+
/// Should be configured before `call.join()` is called. Changes made after
560+
/// joining will not affect the current session until the next join/reconnect.
561+
void enableClientCapabilities(
562+
List<SfuClientCapability> capabilities,
563+
) {
564+
_logger.i(() => '[enableClientCapabilities] $capabilities');
565+
capabilities.forEach(_sfuClientCapabilities.add);
566+
}
567+
568+
/// Disables the given SFU client capabilities for this call.
569+
///
570+
/// Should be configured before `call.join()` is called. Changes made after
571+
/// joining will not affect the current session until the next join/reconnect.
572+
void disableClientCapabilities(
573+
List<SfuClientCapability> capabilities,
574+
) {
575+
_logger.i(() => '[disableClientCapabilities] $capabilities');
576+
capabilities.forEach(_sfuClientCapabilities.remove);
577+
}
578+
553579
/// Accepts the incoming call.
554580
Future<Result<None>> accept() async {
555581
final state = this.state.value;
@@ -1182,6 +1208,7 @@ class Call {
11821208

11831209
final result = await session.start(
11841210
reconnectDetails: reconnectDetails,
1211+
capabilities: _sfuClientCapabilities,
11851212
onRtcManagerCreatedCallback: (_) async {
11861213
_logger.v(() => '[startSession] applying connect options');
11871214
await _applyConnectOptions();

packages/stream_video/lib/src/call/session/call_session.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import '../../errors/video_error.dart';
2020
import '../../errors/video_error_composer.dart';
2121
import '../../sfu/data/events/sfu_events.dart';
2222
import '../../sfu/data/models/sfu_call_state.dart';
23+
import '../../sfu/data/models/sfu_client_capability.dart';
2324
import '../../sfu/data/models/sfu_model_mapper_extensions.dart';
2425
import '../../sfu/data/models/sfu_subscription_details.dart';
2526
import '../../sfu/sfu_client.dart';
@@ -255,6 +256,7 @@ class CallSession extends Disposable {
255256
Duration fastReconnectDeadline,
256257
})>> start({
257258
sfu_events.ReconnectDetails? reconnectDetails,
259+
Set<SfuClientCapability> capabilities = const {},
258260
FutureOr<void> Function(RtcManager)? onRtcManagerCreatedCallback,
259261
bool isAnonymousUser = false,
260262
}) async {
@@ -331,6 +333,7 @@ class CallSession extends Disposable {
331333
reconnectDetails: reconnectDetails,
332334
preferredPublishOptions: preferredPublishOptions,
333335
preferredSubscribeOptions: preferredSubscribeOptions,
336+
capabilities: capabilities.map((c) => c.toDTO()).toList(),
334337
source:
335338
sfu_models.ParticipantSource.PARTICIPANT_SOURCE_WEBRTC_UNSPECIFIED,
336339
);
@@ -634,6 +637,8 @@ class CallSession extends Disposable {
634637
stateManager.sfuDominantSpeakerChanged(event);
635638
} else if (event is SfuPinsUpdatedEvent) {
636639
stateManager.sfuPinsUpdated(event.pins);
640+
} else if (event is SfuInboundStateNotificationEvent) {
641+
stateManager.sfuInboundStateNotification(event);
637642
}
638643
});
639644
}

packages/stream_video/lib/src/call/state/mixins/state_sfu_mixin.dart

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ mixin StateSfuMixin on StateNotifier<CallState> {
6969
...participant.publishedTracks,
7070
if (trackState != null) event.trackType: trackState,
7171
},
72+
pausedTracks: participant.pausedTracks
73+
.toList()
74+
.where((track) => track != event.trackType)
75+
.toSet(),
7276
);
7377
}
7478

@@ -267,4 +271,36 @@ mixin StateSfuMixin on StateNotifier<CallState> {
267271
],
268272
);
269273
}
274+
275+
void sfuInboundStateNotification(SfuInboundStateNotificationEvent event) {
276+
_logger.d(
277+
() => '[sfuInboundStateNotification] ${state.sessionId}; event: $event',
278+
);
279+
280+
state = state.copyWith(
281+
callParticipants: state.callParticipants.map((participant) {
282+
final inboundStates = event.inboundVideoStates.where((it) {
283+
return it.userId == participant.userId &&
284+
it.sessionId == participant.sessionId;
285+
}).toList();
286+
287+
if (inboundStates.isEmpty) {
288+
return participant;
289+
}
290+
291+
final pausedTracks = {...participant.pausedTracks};
292+
for (final inboundState in inboundStates) {
293+
if (inboundState.paused) {
294+
pausedTracks.add(inboundState.trackType);
295+
} else {
296+
pausedTracks.remove(inboundState.trackType);
297+
}
298+
}
299+
300+
return participant.copyWith(
301+
pausedTracks: pausedTracks,
302+
);
303+
}).toList(),
304+
);
305+
}
270306
}

packages/stream_video/lib/src/models/call_participant_state.dart

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class CallParticipantState
2525
required this.sessionId,
2626
required this.trackIdPrefix,
2727
this.publishedTracks = const {},
28+
this.pausedTracks = const {},
2829
this.isLocal = false,
2930
this.connectionQuality = SfuConnectionQuality.unspecified,
3031
this.isOnline = false,
@@ -49,6 +50,7 @@ class CallParticipantState
4950
required this.sessionId,
5051
required this.trackIdPrefix,
5152
required this.publishedTracks,
53+
required this.pausedTracks,
5254
required this.isLocal,
5355
required this.connectionQuality,
5456
required this.isOnline,
@@ -82,6 +84,14 @@ class CallParticipantState
8284
/// List of the last 10 audio levels.
8385
final List<double> audioLevels;
8486

87+
/// A list of tracks that are currently paused by our servers.
88+
/// Typically, a server-side pause happens when the local participant doesn't
89+
/// have enough bandwidth to receive all tracks. In this case, the server
90+
/// will pause some tracks to optimize the bandwidth usage.
91+
/// Once the bandwidth is restored, the server will resume the paused tracks.
92+
/// This is useful to avoid any unwanted video and audio artifacts.
93+
final Set<SfuTrackType> pausedTracks;
94+
8595
final bool isSpeaking;
8696
final bool isDominantSpeaker;
8797
final CallParticipantPin? pin;
@@ -105,6 +115,7 @@ class CallParticipantState
105115
String? sessionId,
106116
String? trackIdPrefix,
107117
Map<SfuTrackType, TrackState>? publishedTracks,
118+
Set<SfuTrackType>? pausedTracks,
108119
bool? isLocal,
109120
SfuConnectionQuality? connectionQuality,
110121
bool? isOnline,
@@ -127,6 +138,7 @@ class CallParticipantState
127138
sessionId: sessionId ?? this.sessionId,
128139
trackIdPrefix: trackIdPrefix ?? this.trackIdPrefix,
129140
publishedTracks: publishedTracks ?? this.publishedTracks,
141+
pausedTracks: pausedTracks ?? this.pausedTracks,
130142
isLocal: isLocal ?? this.isLocal,
131143
connectionQuality: connectionQuality ?? this.connectionQuality,
132144
isOnline: isOnline ?? this.isOnline,
@@ -173,6 +185,7 @@ class CallParticipantState
173185
sessionId: sessionId,
174186
trackIdPrefix: trackIdPrefix,
175187
publishedTracks: publishedTracks,
188+
pausedTracks: pausedTracks,
176189
isLocal: isLocal,
177190
connectionQuality: connectionQuality,
178191
isOnline: isOnline,
@@ -204,6 +217,7 @@ class CallParticipantState
204217
sessionId: sessionId,
205218
trackIdPrefix: trackIdPrefix,
206219
publishedTracks: publishedTracks,
220+
pausedTracks: pausedTracks,
207221
isLocal: isLocal,
208222
connectionQuality: connectionQuality,
209223
isOnline: isOnline,
@@ -242,6 +256,7 @@ class CallParticipantState
242256
'sessionId: $sessionId, '
243257
'trackId: $trackIdPrefix, image: $image, '
244258
'publishedTracks: $publishedTracks, '
259+
'pausedTracks: $pausedTracks, '
245260
'isLocal: $isLocal, '
246261
'connectionQuality: $connectionQuality, isOnline: $isOnline, '
247262
'audioLevel: $audioLevel, audioLevels: $audioLevels, isSpeaking: $isSpeaking, '
@@ -261,6 +276,7 @@ class CallParticipantState
261276
sessionId,
262277
trackIdPrefix,
263278
publishedTracks,
279+
pausedTracks,
264280
isLocal,
265281
connectionQuality,
266282
isOnline,
@@ -299,6 +315,10 @@ class CallParticipantState
299315
return !(screenShareTrack?.muted ?? true);
300316
}
301317

318+
bool isTrackPaused(SfuTrackType trackType) {
319+
return pausedTracks.contains(trackType);
320+
}
321+
302322
UserInfo toUserInfo() => UserInfo(
303323
id: userId,
304324
role: roles.firstOrNull ?? '',

packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import '../models/sfu_connection_info.dart';
1111
import '../models/sfu_connection_quality.dart';
1212
import '../models/sfu_error.dart';
1313
import '../models/sfu_goaway_reason.dart';
14+
import '../models/sfu_inbound_video_state.dart';
1415
import '../models/sfu_model_mapper_extensions.dart';
1516
import '../models/sfu_participant.dart';
1617
import '../models/sfu_participant_source.dart';
1718
import '../models/sfu_pin.dart';
1819
import '../models/sfu_publish_options.dart';
20+
import '../models/sfu_client_capability.dart';
1921
import '../models/sfu_track_type.dart';
2022
import '../models/sfu_video_layer_setting.dart';
2123
import '../models/sfu_video_sender.dart';
@@ -178,6 +180,18 @@ extension SfuEventMapper on sfu_events.SfuEvent {
178180
return SfuCallEndedEvent(
179181
callEndedReason: payload.reason.toDomain(),
180182
);
183+
case sfu_events.SfuEvent_EventPayload.inboundStateNotification:
184+
final payload = inboundStateNotification;
185+
return SfuInboundStateNotificationEvent(
186+
inboundVideoStates: payload.inboundVideoStates
187+
.map((s) => SfuInboundVideoState(
188+
userId: s.userId,
189+
sessionId: s.sessionId,
190+
trackType: s.trackType.toDomain(),
191+
paused: s.paused,
192+
))
193+
.toList(),
194+
);
181195
case sfu_events.SfuEvent_EventPayload.participantUpdated:
182196
final payload = participantUpdated;
183197
return SfuParticipantUpdatedEvent(
@@ -192,6 +206,18 @@ extension SfuEventMapper on sfu_events.SfuEvent {
192206
}
193207
}
194208

209+
extension SfuClientCapabilityExtension on sfu_models.ClientCapability {
210+
SfuClientCapability toDomain() {
211+
switch (this) {
212+
case sfu_models.ClientCapability.CLIENT_CAPABILITY_SUBSCRIBER_VIDEO_PAUSE:
213+
return SfuClientCapability.subscriberVideoPause;
214+
case sfu_models.ClientCapability.CLIENT_CAPABILITY_UNSPECIFIED:
215+
default:
216+
return SfuClientCapability.unspecified;
217+
}
218+
}
219+
}
220+
195221
/// TODO
196222
extension SfuCallStateExtension on sfu_models.CallState {
197223
SfuCallState toDomain() {

packages/stream_video/lib/src/sfu/data/events/sfu_events.dart

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import '../models/sfu_call_state.dart';
1212
import '../models/sfu_connection_info.dart';
1313
import '../models/sfu_error.dart';
1414
import '../models/sfu_goaway_reason.dart';
15+
import '../models/sfu_inbound_video_state.dart';
1516
import '../models/sfu_participant.dart';
1617
import '../models/sfu_pin.dart';
1718
import '../models/sfu_publish_options.dart';
@@ -266,6 +267,22 @@ class SfuCallGrantsUpdated extends SfuEvent {
266267
List<Object> get props => [currentGrants, message];
267268
}
268269

270+
class SfuInboundStateNotificationEvent extends SfuEvent {
271+
const SfuInboundStateNotificationEvent({
272+
required this.inboundVideoStates,
273+
});
274+
275+
final List<SfuInboundVideoState> inboundVideoStates;
276+
277+
@override
278+
List<Object> get props => [inboundVideoStates];
279+
280+
@override
281+
String toString() {
282+
return 'SfuInboundStateNotificationEvent{inboundVideoStates: $inboundVideoStates}';
283+
}
284+
}
285+
269286
class SfuErrorEvent extends SfuEvent {
270287
const SfuErrorEvent({required this.error});
271288

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
enum SfuClientCapability {
2+
unspecified,
3+
subscriberVideoPause;
4+
5+
@override
6+
String toString() {
7+
return name;
8+
}
9+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import 'package:equatable/equatable.dart';
2+
3+
import 'sfu_track_type.dart';
4+
5+
class SfuInboundVideoState with EquatableMixin {
6+
const SfuInboundVideoState({
7+
required this.userId,
8+
required this.sessionId,
9+
required this.trackType,
10+
required this.paused,
11+
});
12+
13+
final String userId;
14+
final String sessionId;
15+
final SfuTrackType trackType;
16+
final bool paused;
17+
18+
@override
19+
List<Object?> get props => [userId, sessionId, trackType, paused];
20+
21+
@override
22+
String toString() {
23+
return 'SfuInboundVideoState{userId: $userId, sessionId: $sessionId, trackType: $trackType, paused: $paused}';
24+
}
25+
}

packages/stream_video/lib/src/sfu/data/models/sfu_model_mapper_extensions.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import '../../../../protobuf/video/sfu/models/models.pb.dart' as sfu_models;
22
import '../../../../protobuf/video/sfu/signal_rpc/signal.pb.dart' as sfu;
33
import '../../../webrtc/model/rtc_video_encoding.dart';
44
import '../../../webrtc/peer_type.dart';
5+
import 'sfu_client_capability.dart';
56
import 'sfu_codec.dart';
67
import 'sfu_publish_options.dart';
78
import 'sfu_subscription_details.dart';
@@ -21,6 +22,18 @@ extension SfuPeerTypeMapper on sfu_models.PeerType {
2122
}
2223
}
2324

25+
extension SfuClientCapabilityMapper on SfuClientCapability {
26+
sfu_models.ClientCapability toDTO() {
27+
switch (this) {
28+
case SfuClientCapability.subscriberVideoPause:
29+
return sfu_models
30+
.ClientCapability.CLIENT_CAPABILITY_SUBSCRIBER_VIDEO_PAUSE;
31+
case SfuClientCapability.unspecified:
32+
return sfu_models.ClientCapability.CLIENT_CAPABILITY_UNSPECIFIED;
33+
}
34+
}
35+
}
36+
2437
extension SfuTrackTypeMapper on SfuTrackType {
2538
sfu_models.TrackType toDTO() {
2639
if (this == SfuTrackType.audio) {

0 commit comments

Comments
 (0)