Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/stream_video/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Unreleased

* Improved SFU stats implementation.

## 1.0.1

### ✅ Added
Expand Down
19 changes: 12 additions & 7 deletions packages/stream_video/lib/src/call/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,12 @@ class Call {
networkMonitor: networkMonitor,
streamVideo: _streamVideo,
statsOptions: _sfuStatsOptions!,
leftoverTraceRecords: _previousSession?.getTrace().snapshot ?? [],
leftoverTraceRecords:
_previousSession
?.getTrace()
.expand((slice) => slice.snapshot)
.toList() ??
const [],
onReconnectionNeeded: (pc, strategy) {
_session?.trace('pc_reconnection_needed', {
'peerConnectionId': pc.type.name,
Expand Down Expand Up @@ -1481,8 +1486,8 @@ class Call {
return;
}

_session?.trace('call_reconnect', {
'strategy': _reconnectStrategy.name,
_session?.trace('callReconnect', {
'strategy': strategy,
});

_stateManager.lifecycleCallConnecting(
Expand Down Expand Up @@ -1531,8 +1536,8 @@ class Call {
await _reconnectMigrate();
}

_session?.trace('call_reconnect_success', {
'strategy': _reconnectStrategy.name,
_session?.trace('callReconnectSuccess', {
'strategy': strategy,
});
} catch (error) {
switch (error) {
Expand All @@ -1541,8 +1546,8 @@ class Call {
_logger.w(() => '[reconnect] unrecoverable error');
_stateManager.lifecycleCallReconnectingFailed();

_session?.trace('call_reconnect_failed', {
'strategy': _reconnectStrategy.name,
_session?.trace('callReconnectFailed', {
'strategy': strategy,
'error': error.toString(),
});

Expand Down
23 changes: 18 additions & 5 deletions packages/stream_video/lib/src/call/session/call_session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const _debounceDuration = Duration(milliseconds: 200);
const _migrationCompleteEventTimeout = Duration(seconds: 7);

class CallSession extends Disposable {
bool _isLeavingOrClosed = false;
CallSession({
required this.callCid,
required this.sessionSeq,
Expand Down Expand Up @@ -104,6 +105,7 @@ class CallSession extends Disposable {
final InternetConnection networkMonitor;
final StatsOptions statsOptions;
final Tracer _tracer;
final Tracer _zonedTracer = Tracer(null);
final StreamVideo _streamVideo;

final Duration joinResponseTimeout;
Expand Down Expand Up @@ -132,12 +134,13 @@ class CallSession extends Disposable {
onCancel: () => Result.error('UpdateViewportVisibility cancelled'),
);

TraceSlice getTrace() {
return _tracer.take();
List<TraceSlice> getTrace() {
return [_tracer.take(), _zonedTracer.take()];
}

void setTraceEnabled(bool enabled) {
_tracer.setEnabled(enabled);
_zonedTracer.setEnabled(enabled);
}

void trace(String tag, dynamic data) {
Expand Down Expand Up @@ -459,6 +462,12 @@ class CallSession extends Disposable {
fastReconnectDeadline: event.fastReconnectDeadline,
),
);
} on TimeoutException catch (e, stk) {
final message =
'Waiting for "joinResponse" has timed out after ${joinResponseTimeout}ms';
_tracer.trace('joinRequestTimeout', message);
_logger.e(() => '[start] failed: $e');
return Result.failure(VideoErrors.compose(e, stk));
} catch (e, stk) {
_logger.e(() => '[start] failed: $e');
return Result.failure(VideoErrors.compose(e, stk));
Expand Down Expand Up @@ -585,6 +594,7 @@ class CallSession extends Disposable {

void leave({String? reason}) {
_logger.d(() => '[leave] no args');
_isLeavingOrClosed = true;
sfuWS.leave(sessionId: sessionId, reason: reason);
}

Expand All @@ -593,6 +603,7 @@ class CallSession extends Disposable {
String? closeReason,
}) async {
_logger.d(() => '[close] code: $code, closeReason: $closeReason');
_isLeavingOrClosed = true;

await _eventsSubscription?.cancel();
await _networkStatusSubscription?.cancel();
Expand All @@ -614,6 +625,7 @@ class CallSession extends Disposable {
@override
Future<void> dispose() async {
_logger.d(() => '[dispose] no args');
_isLeavingOrClosed = true;

await close(StreamWebSocketCloseCode.normalClosure);
return await super.dispose();
Expand Down Expand Up @@ -683,6 +695,7 @@ class CallSession extends Disposable {
} else if (event is SfuParticipantLeftEvent) {
stateManager.sfuParticipantLeft(event);
} else if (event is SfuConnectionQualityChangedEvent) {
_tracer.trace('ConnectionQualityChanged', event.toJson());
stateManager.sfuConnectionQualityChanged(event);
} else if (event is SfuAudioLevelChangedEvent) {
stateManager.sfuUpdateAudioLevelChanged(event);
Expand Down Expand Up @@ -905,7 +918,7 @@ class CallSession extends Disposable {
}

Future<void> _onRenegotiationNeeded(StreamPeerConnection pc) async {
if (stateManager.callState.status.isDisconnected) {
if (_isLeavingOrClosed || stateManager.callState.status.isDisconnected) {
_logger.w(() => '[negotiate] call is disconnected');
return;
}
Expand Down Expand Up @@ -1037,7 +1050,7 @@ class CallSession extends Disposable {
}

final result = TracerZone.run(
_tracer,
_zonedTracer,
++zonedTracerSeq,
() async {
return rtcManager.setCameraEnabled(
Expand All @@ -1060,7 +1073,7 @@ class CallSession extends Disposable {
}

final result = TracerZone.run(
_tracer,
_zonedTracer,
++zonedTracerSeq,
() async {
return rtcManager.setMicrophoneEnabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,17 @@ class CallSessionFactory {

_logger.v(() => '[makeCallSession] sfuName: $sfuName, sfuUrl: $sfuUrl');

final tracer = Tracer('$sessionSeq')
final tracer = Tracer('$sessionSeq-$sfuName')
..setEnabled(statsOptions.enableRtcStats)
..traceMultiple(
leftoverTraceRecords
.map(
(r) => r.copyWith(
id: '${max(0, sessionSeq - 1)}',
id: '${max(0, sessionSeq - 1)}-$sfuName',
),
)
.toList(),
)
..trace('create', {'url': sfuName});
);

return CallSession(
sessionSeq: sessionSeq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,14 @@ class SfuStatsReporter {
final publisherTrace = callSession.rtcManager?.publisher?.tracer
.take();
final sessionTrace = callSession.getTrace();
final mediaDeviceNotifierTrace = RtcMediaDeviceNotifier.instance
.getTrace();

traces.addAll([
if (subscriberTrace != null) subscriberTrace,
if (publisherTrace != null) publisherTrace,
sessionTrace,
...sessionTrace,
mediaDeviceNotifierTrace,
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class StatsReporter extends StateNotifier<CallMetrics?> {
return Stream.periodic(interval, (tick) => (collectStats(), tick)).asyncMap(
(data) async {
final stats = await data.$1;
if (!mounted) return stats;

unawaited(_processStats(stats, data.$2));
return stats;
},
Expand Down Expand Up @@ -92,6 +94,8 @@ class StatsReporter extends StateNotifier<CallMetrics?> {
stats,
int tick,
) async {
if (!mounted) return;

var publisherStats = state?.publisher ?? PeerConnectionStats.empty();
var subscriberStats = state?.subscriber ?? PeerConnectionStats.empty();

Expand Down
Loading