diff --git a/build.yaml b/build.yaml index 02bf52e6..280d04e1 100644 --- a/build.yaml +++ b/build.yaml @@ -19,6 +19,10 @@ targets: - example/** builders: json_serializable: + generate_for: + include: + - lib/src/json/**.dart + - lib/src/token_source/**.dart options: include_if_null: false explicit_to_json: true diff --git a/lib/livekit_client.dart b/lib/livekit_client.dart index 88f460c5..a0f6b21c 100644 --- a/lib/livekit_client.dart +++ b/lib/livekit_client.dart @@ -29,6 +29,15 @@ export 'src/livekit.dart'; export 'src/logger.dart'; export 'src/managers/event.dart'; export 'src/options.dart'; +export 'src/agent/agent.dart'; +export 'src/agent/session.dart'; +export 'src/agent/session_options.dart'; +export 'src/agent/chat/message.dart'; +export 'src/agent/chat/message_sender.dart'; +export 'src/agent/chat/message_receiver.dart'; +export 'src/agent/chat/text_message_sender.dart'; +export 'src/agent/chat/transcription_stream_receiver.dart'; +export 'src/agent/room_agent.dart'; export 'src/participant/local.dart'; export 'src/participant/participant.dart'; export 'src/participant/remote.dart' hide ParticipantCreationResult; @@ -48,7 +57,7 @@ export 'src/track/remote/audio.dart'; export 'src/track/remote/remote.dart'; export 'src/track/remote/video.dart'; export 'src/track/track.dart'; -export 'src/types/attribute_typings.dart'; +export 'src/json/agent_attributes.dart'; export 'src/types/data_stream.dart'; export 'src/types/other.dart'; export 'src/types/participant_permissions.dart'; diff --git a/lib/src/agent/agent.dart b/lib/src/agent/agent.dart new file mode 100644 index 00000000..4095628d --- /dev/null +++ b/lib/src/agent/agent.dart @@ -0,0 +1,233 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'package:flutter/foundation.dart'; + +import 'package:collection/collection.dart'; + +import '../json/agent_attributes.dart'; +import '../participant/participant.dart'; +import '../participant/remote.dart'; +import '../track/remote/audio.dart'; +import '../track/remote/video.dart'; +import '../types/other.dart'; +import 'constants.dart'; + +/// Represents a LiveKit Agent. +/// +/// The [Agent] class models the state of a LiveKit agent within a [Session]. +/// It exposes information about the agent's connection status, conversational +/// state, and the media tracks that belong to the agent. Consumers should +/// observe [Agent] to update their UI when the agent connects, disconnects, +/// or transitions between conversational states such as listening, thinking, +/// and speaking. +/// +/// The associated [Participant]'s attributes are inspected to derive the +/// agent-specific metadata (such as [agentState]). Audio and avatar video +/// tracks are picked from the agent participant and its associated avatar +/// worker (if any). +class Agent extends ChangeNotifier { + Agent(); + + AgentFailure? get error => _error; + AgentFailure? _error; + + /// The current conversational state of the agent. + AgentState? get agentState => _agentState; + AgentState? _agentState; + + /// The agent's audio track, if available. + RemoteAudioTrack? get audioTrack => _audioTrack; + RemoteAudioTrack? _audioTrack; + + /// The agent's avatar video track, if available. + RemoteVideoTrack? get avatarVideoTrack => _avatarVideoTrack; + RemoteVideoTrack? _avatarVideoTrack; + + /// Indicates whether the agent is connected and ready for conversation. + bool get isConnected { + if (_state != _AgentLifecycle.connected) { + return false; + } + return switch (_agentState) { + AgentState.LISTENING || AgentState.THINKING || AgentState.SPEAKING => true, + _ => false, + }; + } + + /// Whether the agent is buffering audio prior to connecting. + bool get isBuffering => _state == _AgentLifecycle.connecting && _isBuffering; + + /// Whether the agent can currently listen for user input. + bool get canListen { + if (_state == _AgentLifecycle.connecting) { + return _isBuffering; + } + if (_state == _AgentLifecycle.connected) { + return switch (_agentState) { + AgentState.LISTENING || AgentState.THINKING || AgentState.SPEAKING => true, + _ => false, + }; + } + return false; + } + + /// Whether the agent is pending initialization. + bool get isPending { + if (_state == _AgentLifecycle.connecting) { + return !_isBuffering; + } + if (_state == _AgentLifecycle.connected) { + return switch (_agentState) { + AgentState.IDLE || AgentState.INITIALIZING => true, + _ => false, + }; + } + return false; + } + + /// Whether the agent finished or failed its session. + bool get isFinished => _state == _AgentLifecycle.disconnected || _state == _AgentLifecycle.failed; + + _AgentLifecycle _state = _AgentLifecycle.disconnected; + bool _isBuffering = false; + + /// Marks the agent as disconnected. + void disconnected() { + if (_state == _AgentLifecycle.disconnected && + _agentState == null && + _audioTrack == null && + _avatarVideoTrack == null && + _error == null) { + return; + } + _state = _AgentLifecycle.disconnected; + _isBuffering = false; + _agentState = null; + _audioTrack = null; + _avatarVideoTrack = null; + _error = null; + notifyListeners(); + } + + /// Marks the agent as connecting. + void connecting({required bool buffering}) { + _state = _AgentLifecycle.connecting; + _isBuffering = buffering; + _error = null; + notifyListeners(); + } + + /// Marks the agent as failed. + void failed(AgentFailure failure) { + _state = _AgentLifecycle.failed; + _isBuffering = false; + _error = failure; + notifyListeners(); + } + + /// Updates the agent with information from the connected [participant]. + void connected(RemoteParticipant participant) { + final AgentState? nextAgentState = _readAgentState(participant); + final RemoteAudioTrack? nextAudioTrack = _resolveAudioTrack(participant); + final RemoteVideoTrack? nextAvatarTrack = _resolveAvatarVideoTrack(participant); + + final bool shouldNotify = _state != _AgentLifecycle.connected || + _agentState != nextAgentState || + !identical(_audioTrack, nextAudioTrack) || + !identical(_avatarVideoTrack, nextAvatarTrack) || + _error != null || + _isBuffering; + + _state = _AgentLifecycle.connected; + _isBuffering = false; + _error = null; + _agentState = nextAgentState; + _audioTrack = nextAudioTrack; + _avatarVideoTrack = nextAvatarTrack; + + if (shouldNotify) { + notifyListeners(); + } + } + + AgentState? _readAgentState(Participant participant) { + final rawState = participant.attributes[lkAgentStateAttributeKey]; + if (rawState == null) { + return null; + } + switch (rawState) { + case 'idle': + return AgentState.IDLE; + case 'initializing': + return AgentState.INITIALIZING; + case 'listening': + return AgentState.LISTENING; + case 'speaking': + return AgentState.SPEAKING; + case 'thinking': + return AgentState.THINKING; + default: + return null; + } + } + + RemoteAudioTrack? _resolveAudioTrack(RemoteParticipant participant) { + final publication = participant.audioTrackPublications.firstWhereOrNull( + (pub) => pub.source == TrackSource.microphone, + ); + return publication?.track; + } + + RemoteVideoTrack? _resolveAvatarVideoTrack(RemoteParticipant participant) { + final avatarWorker = _findAvatarWorker(participant); + if (avatarWorker == null) { + return null; + } + final publication = avatarWorker.videoTrackPublications.firstWhereOrNull( + (pub) => pub.source == TrackSource.camera, + ); + return publication?.track; + } + + RemoteParticipant? _findAvatarWorker(RemoteParticipant participant) { + final publishOnBehalf = participant.identity; + final room = participant.room; + return room.remoteParticipants.values.firstWhereOrNull( + (p) => p.attributes[lkPublishOnBehalfAttributeKey] == publishOnBehalf, + ); + } +} + +/// Describes why an [Agent] failed to connect. +enum AgentFailure { + /// The agent did not connect within the allotted timeout. + timeout, + + /// The agent left the room unexpectedly. + left; + + /// A human-readable error message. + String get message => switch (this) { + AgentFailure.timeout => 'Agent did not connect', + AgentFailure.left => 'Agent left the room unexpectedly', + }; +} + +enum _AgentLifecycle { + disconnected, + connecting, + connected, + failed, +} diff --git a/lib/src/agent/chat/message.dart b/lib/src/agent/chat/message.dart new file mode 100644 index 00000000..b744fee2 --- /dev/null +++ b/lib/src/agent/chat/message.dart @@ -0,0 +1,130 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'package:flutter/foundation.dart'; + +/// A message received from the agent. +@immutable +class ReceivedMessage { + const ReceivedMessage({ + required this.id, + required this.timestamp, + required this.content, + }); + + final String id; + final DateTime timestamp; + final ReceivedMessageContent content; + + ReceivedMessage copyWith({ + String? id, + DateTime? timestamp, + ReceivedMessageContent? content, + }) { + return ReceivedMessage( + id: id ?? this.id, + timestamp: timestamp ?? this.timestamp, + content: content ?? this.content, + ); + } + + @override + bool operator ==(Object other) { + if (identical(this, other)) return true; + return other is ReceivedMessage && other.id == id && other.timestamp == timestamp && other.content == content; + } + + @override + int get hashCode => Object.hash(id, timestamp, content); +} + +/// Base class for message content types that can be received from the agent. +sealed class ReceivedMessageContent { + const ReceivedMessageContent(); + + /// Textual representation of the content. + String get text; +} + +/// A transcript emitted by the agent. +class AgentTranscript extends ReceivedMessageContent { + const AgentTranscript(this.text); + + @override + final String text; + + @override + bool operator ==(Object other) => other is AgentTranscript && other.text == text; + + @override + int get hashCode => text.hashCode; +} + +/// A transcript emitted for the user (e.g., speech-to-text). +class UserTranscript extends ReceivedMessageContent { + const UserTranscript(this.text); + + @override + final String text; + + @override + bool operator ==(Object other) => other is UserTranscript && other.text == text; + + @override + int get hashCode => text.hashCode; +} + +/// A message that originated from user input (loopback). +class UserInput extends ReceivedMessageContent { + const UserInput(this.text); + + @override + final String text; + + @override + bool operator ==(Object other) => other is UserInput && other.text == text; + + @override + int get hashCode => text.hashCode; +} + +/// A message sent to the agent. +@immutable +class SentMessage { + const SentMessage({ + required this.id, + required this.timestamp, + required this.content, + }); + + final String id; + final DateTime timestamp; + final SentMessageContent content; +} + +/// Base class for message content types that can be sent to the agent. +sealed class SentMessageContent { + const SentMessageContent(); + + /// Textual representation of the content. + String get text; +} + +/// User-provided text input. +class SentUserInput extends SentMessageContent { + const SentUserInput(this.text); + + @override + final String text; +} diff --git a/lib/src/agent/chat/message_receiver.dart b/lib/src/agent/chat/message_receiver.dart new file mode 100644 index 00000000..9c47825c --- /dev/null +++ b/lib/src/agent/chat/message_receiver.dart @@ -0,0 +1,24 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'message.dart'; + +/// Receives messages produced by the agent. +abstract class MessageReceiver { + Stream messages(); + + Future dispose(); +} diff --git a/lib/src/agent/chat/message_sender.dart b/lib/src/agent/chat/message_sender.dart new file mode 100644 index 00000000..a9559741 --- /dev/null +++ b/lib/src/agent/chat/message_sender.dart @@ -0,0 +1,20 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'message.dart'; + +/// Sends messages to the agent. +abstract class MessageSender { + Future send(SentMessage message); +} diff --git a/lib/src/agent/chat/text_message_sender.dart b/lib/src/agent/chat/text_message_sender.dart new file mode 100644 index 00000000..f83f73c1 --- /dev/null +++ b/lib/src/agent/chat/text_message_sender.dart @@ -0,0 +1,85 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'package:uuid/uuid.dart'; + +import '../../core/room.dart'; +import '../../participant/local.dart'; +import '../../types/data_stream.dart'; +import 'message.dart'; +import 'message_receiver.dart'; +import 'message_sender.dart'; + +/// Sends text messages to the agent and emits a loopback message so the UI +/// can reflect the user's input immediately. +class TextMessageSender implements MessageSender, MessageReceiver { + TextMessageSender({ + required Room room, + this.topic = 'lk.chat', + }) : _room = room { + _controller = StreamController.broadcast(); + } + + final Room _room; + final String topic; + late final StreamController _controller; + + @override + Stream messages() => _controller.stream; + + @override + Future dispose() async { + await _controller.close(); + } + + @override + Future send(SentMessage message) async { + final content = message.content; + if (content is! SentUserInput) { + return; + } + + final LocalParticipant? localParticipant = _room.localParticipant; + if (localParticipant == null) { + throw StateError('Cannot send a message before connecting to the room.'); + } + + await localParticipant.sendText( + content.text, + options: SendTextOptions(topic: topic), + ); + + if (!_controller.isClosed) { + _controller.add( + ReceivedMessage( + id: message.id, + timestamp: message.timestamp, + content: UserInput(content.text), + ), + ); + } + } + + /// Convenience helper for sending text without constructing a [SentMessage]. + Future sendText(String text) { + final message = SentMessage( + id: const Uuid().v4(), + timestamp: DateTime.timestamp(), + content: SentUserInput(text), + ); + return send(message); + } +} diff --git a/lib/src/agent/chat/transcription_stream_receiver.dart b/lib/src/agent/chat/transcription_stream_receiver.dart new file mode 100644 index 00000000..0c4a6011 --- /dev/null +++ b/lib/src/agent/chat/transcription_stream_receiver.dart @@ -0,0 +1,290 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; + +import '../../core/room.dart'; +import '../../data_stream/stream_reader.dart'; +import '../../logger.dart'; +import '../../types/data_stream.dart'; +import 'message.dart'; +import 'message_receiver.dart'; + +/// Converts LiveKit transcription text streams into [ReceivedMessage]s. +/// +/// Each stream corresponds to a single message (agent or user). The stream +/// yields textual updates which are aggregated until the message is finalized. +/// When a new message for the same participant arrives, previous partial +/// content is purged so that memory usage remains bounded. +class TranscriptionStreamReceiver implements MessageReceiver { + TranscriptionStreamReceiver({ + required Room room, + this.topic = 'lk.transcription', + }) : _room = room; + + final Room _room; + final String topic; + + StreamController? _controller; + bool _registered = false; + bool _controllerClosed = false; + + final Map<_PartialMessageId, _PartialMessage> _partialMessages = HashMap(); + + @override + Stream messages() { + if (_controller != null) { + return _controller!.stream; + } + + _controller = StreamController.broadcast( + onListen: _registerHandler, + onCancel: _handleCancel, + ); + _controllerClosed = false; + return _controller!.stream; + } + + void _registerHandler() { + if (_registered) { + return; + } + _registered = true; + + _room.registerTextStreamHandler(topic, (TextStreamReader reader, String participantIdentity) { + reader.listen( + (chunk) { + final info = reader.info; + if (info == null) { + logger.warning('Received transcription chunk without metadata.'); + return; + } + + if (chunk.content.isEmpty) { + return; + } + + final String text; + try { + text = utf8.decode(chunk.content); + } catch (error) { + logger.warning('Failed to decode transcription chunk: $error'); + return; + } + + if (text.isEmpty) { + return; + } + + final message = _processIncoming( + text, + info, + participantIdentity, + ); + if (!_controller!.isClosed) { + _controller!.add(message); + } + }, + onError: (Object error, StackTrace stackTrace) { + if (!_controller!.isClosed) { + _controller!.addError(error, stackTrace); + } + }, + onDone: () { + final info = reader.info; + if (info != null) { + final attributes = info.attributes; + final segmentId = _extractSegmentId(attributes, info.id); + final key = _PartialMessageId(segmentId: segmentId, participantId: participantIdentity); + _partialMessages.remove(key); + } + }, + cancelOnError: true, + ); + }); + } + + void _handleCancel() { + if (_registered) { + _room.unregisterTextStreamHandler(topic); + _registered = false; + } + _partialMessages.clear(); + if (_controllerClosed) { + return; + } + _controllerClosed = true; + final controller = _controller; + _controller = null; + if (controller != null) { + unawaited(controller.close()); + } + } + + ReceivedMessage _processIncoming( + String chunk, + TextStreamInfo info, + String participantIdentity, + ) { + final attributes = _TranscriptionAttributes.from(info.attributes); + final segmentId = _extractSegmentId(info.attributes, info.id); + final key = _PartialMessageId(segmentId: segmentId, participantId: participantIdentity); + final currentStreamId = info.id; + + final DateTime timestamp = DateTime.fromMillisecondsSinceEpoch(info.timestamp, isUtc: true).toLocal(); + + final existing = _partialMessages[key]; + if (existing != null) { + if (existing.streamId == currentStreamId) { + existing.append(chunk); + } else { + existing.replace(chunk, currentStreamId); + } + } else { + _partialMessages[key] = _PartialMessage( + content: chunk, + timestamp: timestamp, + streamId: currentStreamId, + ); + _cleanupPreviousTurn(participantIdentity, segmentId); + } + + final currentPartial = _partialMessages[key]; + + if (attributes.isFinal == true) { + _partialMessages.remove(key); + } + + final partial = attributes.isFinal == true ? currentPartial : _partialMessages[key]; + final displayContent = partial?.content ?? chunk; + final displayTimestamp = partial?.timestamp ?? timestamp; + final isLocalParticipant = _room.localParticipant?.identity == participantIdentity; + + final ReceivedMessageContent content = + isLocalParticipant ? UserTranscript(displayContent) : AgentTranscript(displayContent); + + return ReceivedMessage( + id: segmentId, + timestamp: displayTimestamp, + content: content, + ); + } + + void _cleanupPreviousTurn(String participantId, String currentSegmentId) { + final keysToRemove = _partialMessages.keys + .where((key) => key.participantId == participantId && key.segmentId != currentSegmentId) + .toList(growable: false); + + for (final key in keysToRemove) { + _partialMessages.remove(key); + } + } + + String _extractSegmentId(Map attributes, String fallback) { + return attributes[_AttributeKeys.segmentId] ?? fallback; + } + + @override + Future dispose() async { + if (_registered) { + _room.unregisterTextStreamHandler(topic); + _registered = false; + } + _partialMessages.clear(); + if (!_controllerClosed) { + _controllerClosed = true; + final controller = _controller; + _controller = null; + if (controller != null) { + await controller.close(); + } + } + } +} + +class _PartialMessageId { + _PartialMessageId({ + required this.segmentId, + required this.participantId, + }); + + final String segmentId; + final String participantId; + + @override + bool operator ==(Object other) => + other is _PartialMessageId && other.segmentId == segmentId && other.participantId == participantId; + + @override + int get hashCode => Object.hash(segmentId, participantId); +} + +class _PartialMessage { + _PartialMessage({ + required this.content, + required this.timestamp, + required this.streamId, + }); + + String content; + DateTime timestamp; + String streamId; + + void append(String chunk) { + content += chunk; + } + + void replace(String chunk, String newStreamId) { + content = chunk; + streamId = newStreamId; + } +} + +class _TranscriptionAttributes { + _TranscriptionAttributes({ + required this.segmentId, + required this.isFinal, + }); + + final String? segmentId; + final bool? isFinal; + + static _TranscriptionAttributes from(Map attributes) { + return _TranscriptionAttributes( + segmentId: attributes[_AttributeKeys.segmentId], + isFinal: _parseBool(attributes[_AttributeKeys.transcriptionFinal]), + ); + } + + static bool? _parseBool(String? value) { + if (value == null) { + return null; + } + final normalized = value.toLowerCase(); + if (normalized == 'true' || normalized == '1') { + return true; + } + if (normalized == 'false' || normalized == '0') { + return false; + } + return null; + } +} + +class _AttributeKeys { + static const segmentId = 'lk.segment_id'; + static const transcriptionFinal = 'lk.transcription_final'; +} diff --git a/lib/src/agent/constants.dart b/lib/src/agent/constants.dart new file mode 100644 index 00000000..2a6abc0a --- /dev/null +++ b/lib/src/agent/constants.dart @@ -0,0 +1,20 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Participant attribute keys used for agent metadata. +const lkAgentStateAttributeKey = 'lk.agent.state'; +const lkPublishOnBehalfAttributeKey = 'lk.publish_on_behalf'; +const lkAgentNameAttributeKey = 'lk.agent_name'; +const lkAgentInputsAttributeKey = 'lk.agent.inputs'; +const lkAgentOutputsAttributeKey = 'lk.agent.outputs'; diff --git a/lib/src/agent/room_agent.dart b/lib/src/agent/room_agent.dart new file mode 100644 index 00000000..f4d5473f --- /dev/null +++ b/lib/src/agent/room_agent.dart @@ -0,0 +1,36 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'package:collection/collection.dart'; + +import '../core/room.dart'; +import '../participant/remote.dart'; +import '../types/other.dart'; +import 'constants.dart'; + +extension AgentRoom on Room { + /// All agent participants currently in the room. + Iterable get agentParticipants => remoteParticipants.values.where( + (participant) { + if (participant.kind != ParticipantKind.AGENT) { + return false; + } + final publishOnBehalf = participant.attributes[lkPublishOnBehalfAttributeKey]; + return publishOnBehalf == null || publishOnBehalf.isEmpty; + }, + ); + + /// The first agent participant in the room, if one exists. + RemoteParticipant? get agentParticipant => agentParticipants.firstOrNull; +} diff --git a/lib/src/agent/session.dart b/lib/src/agent/session.dart new file mode 100644 index 00000000..3927b1a0 --- /dev/null +++ b/lib/src/agent/session.dart @@ -0,0 +1,428 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:collection'; + +import 'package:collection/collection.dart'; +import 'package:uuid/uuid.dart'; + +import '../core/room.dart'; +import '../core/room_preconnect.dart'; +import '../events.dart'; +import '../logger.dart'; +import '../managers/event.dart'; +import '../participant/remote.dart'; +import '../support/disposable.dart'; +import '../token_source/jwt.dart'; +import '../token_source/token_source.dart'; +import '../types/other.dart'; +import 'agent.dart'; +import 'chat/message.dart'; +import 'chat/message_receiver.dart'; +import 'chat/message_sender.dart'; +import 'chat/text_message_sender.dart'; +import 'chat/transcription_stream_receiver.dart'; +import 'room_agent.dart'; +import 'session_options.dart'; + +/// A [Session] represents a connection to a LiveKit Room that can contain an agent. +/// +/// A session encapsulates the lifecycle of connecting to a room, dispatching an +/// agent, and relaying messages between the user and the agent. It exposes +/// observable state such as the agent's connection status, any session errors, +/// and the ordered history of messages exchanged during the session. +/// +/// To use a session, provide a token source (fixed or configurable) and call +/// [start]. When finished, call [end] to disconnect from the room. Messages can +/// be sent with [sendText], and the message history can be inspected or restored +/// via [messages], [getMessageHistory], and [restoreMessageHistory]. +/// +/// The session is designed to be observed from Flutter widgets (it extends +/// [ChangeNotifier] through [DisposableChangeNotifier]) in the same way that the +/// Swift implementation conforms to `ObservableObject`. +class Session extends DisposableChangeNotifier { + Session._({ + required _TokenSourceConfiguration tokenSourceConfiguration, + required SessionOptions options, + List? senders, + List? receivers, + }) : _tokenSourceConfiguration = tokenSourceConfiguration, + _options = options, + room = options.room { + _agent.addListener(notifyListeners); + + final textMessageSender = TextMessageSender(room: room); + final resolvedSenders = senders ?? [textMessageSender]; + final resolvedReceivers = + receivers ?? [textMessageSender, TranscriptionStreamReceiver(room: room)]; + + _senders.addAll(resolvedSenders); + _receivers.addAll(resolvedReceivers); + + _observeRoom(); + _observeReceivers(); + + onDispose(() async { + _agent.removeListener(notifyListeners); + await _roomListener?.dispose(); + await _cancelReceiverSubscriptions(); + await Future.wait(_receivers.toSet().map((receiver) => receiver.dispose())); + _agentTimeoutTimer?.cancel(); + }); + } + + /// Initializes a new [Session] with a fixed token source. + factory Session.fromFixedTokenSource( + TokenSourceFixed tokenSource, { + SessionOptions? options, + List? senders, + List? receivers, + }) { + return Session._( + tokenSourceConfiguration: _FixedTokenSourceConfiguration(tokenSource), + options: options ?? SessionOptions(), + senders: senders, + receivers: receivers, + ); + } + + /// Initializes a new [Session] with a configurable token source. + factory Session.fromConfigurableTokenSource( + TokenSourceConfigurable tokenSource, { + TokenRequestOptions tokenOptions = const TokenRequestOptions(), + SessionOptions? options, + List? senders, + List? receivers, + }) { + return Session._( + tokenSourceConfiguration: _ConfigurableTokenSourceConfiguration(tokenSource, tokenOptions), + options: options ?? SessionOptions(), + senders: senders, + receivers: receivers, + ); + } + + /// Creates a new [Session] configured for a specific agent. + factory Session.withAgent( + String agentName, { + String? agentMetadata, + required TokenSourceConfigurable tokenSource, + SessionOptions? options, + List? senders, + List? receivers, + }) { + return Session.fromConfigurableTokenSource( + tokenSource, + tokenOptions: TokenRequestOptions( + agentName: agentName, + agentMetadata: agentMetadata, + ), + options: options, + senders: senders, + receivers: receivers, + ); + } + + static final Uuid _uuid = const Uuid(); + + final Room room; + final SessionOptions _options; + final _TokenSourceConfiguration _tokenSourceConfiguration; + + final Agent _agent = Agent(); + Agent get agent => _agent; + + SessionError? get error => _error; + SessionError? _error; + + ConnectionState get connectionState => _connectionState; + ConnectionState _connectionState = ConnectionState.disconnected; + + bool get isConnected => switch (_connectionState) { + ConnectionState.connecting || ConnectionState.connected || ConnectionState.reconnecting => true, + ConnectionState.disconnected => false, + }; + + final LinkedHashMap _messages = LinkedHashMap(); + + List get messages => List.unmodifiable(_messages.values); + + final List _senders = []; + final List _receivers = []; + final List> _receiverSubscriptions = []; + + EventsListener? _roomListener; + Timer? _agentTimeoutTimer; + + /// Starts the session by fetching credentials and connecting to the room. + Future start() async { + if (room.connectionState != ConnectionState.disconnected) { + logger.info('Session.start() ignored: room already connecting or connected.'); + return; + } + + _setError(null); + _agentTimeoutTimer?.cancel(); + + final Duration timeout = _options.agentConnectTimeout; + + Future connect() async { + final response = await _tokenSourceConfiguration.fetch(); + await room.connect( + response.serverUrl, + response.participantToken, + ); + return response.dispatchesAgent(); + } + + try { + final bool dispatchesAgent; + if (_options.preConnectAudio) { + dispatchesAgent = await room.withPreConnectAudio( + () async { + _setConnectionState(ConnectionState.connecting); + _agent.connecting(buffering: true); + return connect(); + }, + timeout: timeout, + ); + } else { + _setConnectionState(ConnectionState.connecting); + _agent.connecting(buffering: false); + dispatchesAgent = await connect(); + await room.localParticipant?.setMicrophoneEnabled(true); + } + + if (dispatchesAgent) { + _agentTimeoutTimer = Timer(timeout, () { + if (isConnected && !_agent.isConnected) { + _agent.failed(AgentFailure.timeout); + } + }); + } else { + _agentTimeoutTimer?.cancel(); + _agentTimeoutTimer = null; + } + } catch (error, stackTrace) { + logger.warning('Session.start() failed: $error', error, stackTrace); + _setError(SessionError.connection(error)); + _setConnectionState(ConnectionState.disconnected); + _agent.disconnected(); + } + } + + /// Terminates the session and disconnects from the room. + Future end() async { + await room.disconnect(); + } + + /// Clears the last error. + void dismissError() { + _setError(null); + } + + /// Sends a text message to the agent. + /// + /// Returns the [SentMessage] if the message was sent by all senders, or + /// `null` if a sender failed. When a sender fails, the session error is set + /// to [SessionErrorKind.sender]. + Future sendText(String text) async { + final message = SentMessage( + id: _uuid.v4(), + timestamp: DateTime.timestamp(), + content: SentUserInput(text), + ); + + try { + for (final sender in _senders) { + await sender.send(message); + } + return message; + } catch (error, stackTrace) { + logger.warning('Session.sendText() failed: $error', error, stackTrace); + _setError(SessionError.sender(error)); + return null; + } + } + + /// Returns the message history. + List getMessageHistory() => List.unmodifiable(_messages.values); + + /// Restores the message history with the provided [messages]. + void restoreMessageHistory(List messages) { + _messages + ..clear() + ..addEntries( + messages.sorted((a, b) => a.timestamp.compareTo(b.timestamp)).map( + (message) => MapEntry(message.id, message), + ), + ); + notifyListeners(); + } + + void _observeRoom() { + final listener = room.createListener(); + listener.listen((event) async { + _handleRoomEvent(event); + }); + _roomListener = listener; + } + + void _observeReceivers() { + for (final receiver in _receivers) { + final subscription = receiver.messages().listen( + (message) { + final existing = _messages[message.id]; + final shouldNotify = existing != message; + _messages[message.id] = message; + if (shouldNotify) { + notifyListeners(); + } + }, + onError: (Object error, StackTrace stackTrace) { + logger.warning('Session receiver error: $error', error, stackTrace); + _setError(SessionError.receiver(error)); + }, + ); + _receiverSubscriptions.add(subscription); + } + } + + Future _cancelReceiverSubscriptions() async { + for (final subscription in _receiverSubscriptions) { + await subscription.cancel(); + } + _receiverSubscriptions.clear(); + } + + void _handleRoomEvent(RoomEvent event) { + bool shouldUpdateAgent = false; + + if (event is RoomConnectedEvent || event is RoomReconnectedEvent) { + _setConnectionState(ConnectionState.connected); + shouldUpdateAgent = true; + } else if (event is RoomReconnectingEvent) { + _setConnectionState(ConnectionState.reconnecting); + shouldUpdateAgent = true; + } else if (event is RoomDisconnectedEvent) { + _setConnectionState(ConnectionState.disconnected); + _agent.disconnected(); + shouldUpdateAgent = true; + } + + if (event is ParticipantEvent) { + shouldUpdateAgent = true; + } + + if (shouldUpdateAgent) { + _updateAgent(); + } + } + + void _updateAgent() { + final connectionState = room.connectionState; + _setConnectionState(connectionState); + + if (connectionState == ConnectionState.disconnected) { + _agent.disconnected(); + return; + } + + final RemoteParticipant? firstAgent = room.agentParticipants.firstOrNull; + if (firstAgent != null) { + _agent.connected(firstAgent); + } else if (_agent.isConnected) { + _agent.failed(AgentFailure.left); + } else { + _agent.connecting(buffering: _options.preConnectAudio); + } + } + + void _setConnectionState(ConnectionState state) { + if (_connectionState == state) { + return; + } + _connectionState = state; + notifyListeners(); + } + + void _setError(SessionError? newError) { + if (_error == newError) { + return; + } + _error = newError; + notifyListeners(); + } +} + +enum SessionErrorKind { + connection, + sender, + receiver, +} + +/// Represents an error that occurred during a [Session]. +class SessionError { + SessionError._(this.kind, this.cause); + + final SessionErrorKind kind; + final Object cause; + + String get message => switch (kind) { + SessionErrorKind.connection => 'Connection failed: ${cause}', + SessionErrorKind.sender => 'Message sender failed: ${cause}', + SessionErrorKind.receiver => 'Message receiver failed: ${cause}', + }; + + static SessionError connection(Object cause) => SessionError._(SessionErrorKind.connection, cause); + + static SessionError sender(Object cause) => SessionError._(SessionErrorKind.sender, cause); + + static SessionError receiver(Object cause) => SessionError._(SessionErrorKind.receiver, cause); + + @override + bool operator ==(Object other) { + if (identical(this, other)) return true; + return other is SessionError && other.kind == kind && other.cause == cause; + } + + @override + int get hashCode => Object.hash(kind, cause); +} + +sealed class _TokenSourceConfiguration { + const _TokenSourceConfiguration(); + + Future fetch(); +} + +class _FixedTokenSourceConfiguration extends _TokenSourceConfiguration { + const _FixedTokenSourceConfiguration(this.source); + + final TokenSourceFixed source; + + @override + Future fetch() => source.fetch(); +} + +class _ConfigurableTokenSourceConfiguration extends _TokenSourceConfiguration { + const _ConfigurableTokenSourceConfiguration(this.source, this.options); + + final TokenSourceConfigurable source; + final TokenRequestOptions options; + + @override + Future fetch() => source.fetch(options); +} diff --git a/lib/src/agent/session_options.dart b/lib/src/agent/session_options.dart new file mode 100644 index 00000000..eb6d262c --- /dev/null +++ b/lib/src/agent/session_options.dart @@ -0,0 +1,50 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import '../core/room.dart'; + +/// Options for creating a [Session]. +class SessionOptions { + /// The underlying [Room] used by the session. + final Room room; + + /// Whether to enable audio pre-connect with [PreConnectAudioBuffer]. + /// + /// If enabled, the microphone is activated before connecting to the room. + /// Ensure microphone permissions are requested early in the app lifecycle so + /// that pre-connect can succeed without additional prompts. + final bool preConnectAudio; + + /// The timeout for the agent to connect. If exceeded, the agent transitions + /// to a failed state. + final Duration agentConnectTimeout; + + SessionOptions({ + Room? room, + this.preConnectAudio = true, + this.agentConnectTimeout = const Duration(seconds: 20), + }) : room = room ?? Room(); + + SessionOptions copyWith({ + Room? room, + bool? preConnectAudio, + Duration? agentConnectTimeout, + }) { + return SessionOptions( + room: room ?? this.room, + preConnectAudio: preConnectAudio ?? this.preConnectAudio, + agentConnectTimeout: agentConnectTimeout ?? this.agentConnectTimeout, + ); + } +} diff --git a/lib/src/core/room_preconnect.dart b/lib/src/core/room_preconnect.dart index 8876d530..fa50b567 100644 --- a/lib/src/core/room_preconnect.dart +++ b/lib/src/core/room_preconnect.dart @@ -26,6 +26,7 @@ extension RoomPreConnect on Room { Duration timeout = const Duration(seconds: 10), PreConnectOnError? onError, }) async { + preConnectAudioBuffer.setErrorHandler(onError); await preConnectAudioBuffer.startRecording(timeout: timeout); try { final result = await operation(); diff --git a/lib/src/json/agent_attributes.dart b/lib/src/json/agent_attributes.dart new file mode 100644 index 00000000..2054fe55 --- /dev/null +++ b/lib/src/json/agent_attributes.dart @@ -0,0 +1,115 @@ +import 'dart:convert'; + +import 'package:json_annotation/json_annotation.dart'; + +import '../agent/constants.dart'; + +part 'agent_attributes.g.dart'; + +AgentAttributes agentAttributesFromJson(String source) => + AgentAttributes.fromJson(jsonDecode(source) as Map); + +String agentAttributesToJson(AgentAttributes value) => jsonEncode(value.toJson()); + +TranscriptionAttributes transcriptionAttributesFromJson(String source) => + TranscriptionAttributes.fromJson(jsonDecode(source) as Map); + +String transcriptionAttributesToJson(TranscriptionAttributes value) => jsonEncode(value.toJson()); + +@JsonSerializable() +class AgentAttributes { + const AgentAttributes({ + this.lkAgentInputs, + this.lkAgentOutputs, + this.lkAgentState, + this.lkPublishOnBehalf, + }); + + @JsonKey(name: lkAgentInputsAttributeKey) + final List? lkAgentInputs; + + @JsonKey(name: lkAgentOutputsAttributeKey) + final List? lkAgentOutputs; + + @JsonKey(name: lkAgentStateAttributeKey) + final AgentState? lkAgentState; + + @JsonKey(name: lkPublishOnBehalfAttributeKey) + final String? lkPublishOnBehalf; + + factory AgentAttributes.fromJson(Map json) => _$AgentAttributesFromJson(json); + + Map toJson() => _$AgentAttributesToJson(this); +} + +@JsonEnum(alwaysCreate: true) +enum AgentInput { + @JsonValue('audio') + AUDIO, + @JsonValue('text') + TEXT, + @JsonValue('video') + VIDEO, +} + +@JsonEnum(alwaysCreate: true) +enum AgentOutput { + @JsonValue('audio') + AUDIO, + @JsonValue('transcription') + TRANSCRIPTION, +} + +@JsonEnum(alwaysCreate: true) +enum AgentState { + @JsonValue('idle') + IDLE, + @JsonValue('initializing') + INITIALIZING, + @JsonValue('listening') + LISTENING, + @JsonValue('speaking') + SPEAKING, + @JsonValue('thinking') + THINKING, +} + +@JsonSerializable() +class TranscriptionAttributes { + const TranscriptionAttributes({ + this.lkSegmentId, + this.lkTranscribedTrackId, + this.lkTranscriptionFinal, + }); + + @JsonKey(name: 'lk.segment_id') + final String? lkSegmentId; + + @JsonKey(name: 'lk.transcribed_track_id') + final String? lkTranscribedTrackId; + + @JsonKey(name: 'lk.transcription_final', fromJson: _boolFromJson, toJson: _boolToJson) + final bool? lkTranscriptionFinal; + + factory TranscriptionAttributes.fromJson(Map json) => _$TranscriptionAttributesFromJson(json); + + Map toJson() => _$TranscriptionAttributesToJson(this); +} + +bool? _boolFromJson(Object? value) { + if (value is bool) { + return value; + } + if (value is String) { + final lower = value.toLowerCase(); + if (lower == 'true' || lower == '1') { + return true; + } + if (lower == 'false' || lower == '0') { + return false; + } + } + return null; +} + +Object? _boolToJson(bool? value) => value; diff --git a/lib/src/types/attribute_typings.g.dart b/lib/src/json/agent_attributes.g.dart similarity index 77% rename from lib/src/types/attribute_typings.g.dart rename to lib/src/json/agent_attributes.g.dart index 05814fe1..d2bb0b85 100644 --- a/lib/src/types/attribute_typings.g.dart +++ b/lib/src/json/agent_attributes.g.dart @@ -1,6 +1,6 @@ // GENERATED CODE - DO NOT MODIFY BY HAND -part of 'attribute_typings.dart'; +part of 'agent_attributes.dart'; // ************************************************************************** // JsonSerializableGenerator @@ -25,32 +25,32 @@ Map _$AgentAttributesToJson(AgentAttributes instance) => json) => TranscriptionAttributes( lkSegmentId: json['lk.segment_id'] as String?, lkTranscribedTrackId: json['lk.transcribed_track_id'] as String?, - lkTranscriptionFinal: json['lk.transcription_final'] as bool?, + lkTranscriptionFinal: _boolFromJson(json['lk.transcription_final']), ); Map _$TranscriptionAttributesToJson(TranscriptionAttributes instance) => { if (instance.lkSegmentId case final value?) 'lk.segment_id': value, if (instance.lkTranscribedTrackId case final value?) 'lk.transcribed_track_id': value, - if (instance.lkTranscriptionFinal case final value?) 'lk.transcription_final': value, + if (_boolToJson(instance.lkTranscriptionFinal) case final value?) 'lk.transcription_final': value, }; diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 03b79b42..c46b6fec 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -53,7 +53,7 @@ class PreConnectAudioBuffer { EventChannel? _eventChannel; StreamSubscription? _streamSubscription; - final PreConnectOnError? _onError; + PreConnectOnError? _onError; final int _requestSampleRate; int? _renderedSampleRate; @@ -301,4 +301,9 @@ class PreConnectAudioBuffer { logger.info( '[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio (${secondsOfAudio.toStringAsFixed(2)} seconds) to ${agents} agent(s)'); } + + /// Updates the callback invoked when pre-connect audio fails. + void setErrorHandler(PreConnectOnError? onError) { + _onError = onError; + } } diff --git a/lib/src/token_source/jwt.dart b/lib/src/token_source/jwt.dart index 35962987..a2ef8b2f 100644 --- a/lib/src/token_source/jwt.dart +++ b/lib/src/token_source/jwt.dart @@ -15,6 +15,7 @@ import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart'; import 'package:json_annotation/json_annotation.dart'; +import 'room_configuration.dart'; import 'token_source.dart'; part 'jwt.g.dart'; @@ -75,6 +76,18 @@ class LiveKitJwtPayload { return null; } + /// Room configuration embedded in the token, if present. + RoomConfiguration? get roomConfiguration { + final raw = _claims['roomConfig'] ?? _claims['room_config']; + if (raw is Map) { + return RoomConfiguration.fromJson(Map.from(raw)); + } + if (raw is Map) { + return RoomConfiguration.fromJson(Map.from(raw)); + } + return null; + } + /// Token expiration instant in UTC. DateTime? get expiresAt => _dateTimeFor('exp'); @@ -203,6 +216,12 @@ extension TokenSourceJwt on TokenSourceResponse { return true; } + + /// Returns `true` when the token's room configuration dispatches at least one agent. + bool dispatchesAgent() { + final agents = jwtPayload?.roomConfiguration?.agents; + return agents != null && agents.isNotEmpty; + } } /// Extension to extract LiveKit-specific claims from JWT tokens. diff --git a/lib/src/types/attribute_typings.dart b/lib/src/types/attribute_typings.dart deleted file mode 100644 index 2601b0f2..00000000 --- a/lib/src/types/attribute_typings.dart +++ /dev/null @@ -1,101 +0,0 @@ -// To parse this JSON data, do -// -// final agentAttributes = agentAttributesFromJson(jsonString); -// final transcriptionAttributes = transcriptionAttributesFromJson(jsonString); - -import 'dart:convert'; - -import 'package:json_annotation/json_annotation.dart'; - -part 'attribute_typings.g.dart'; - -AgentAttributes agentAttributesFromJson(String str) => AgentAttributes.fromJson(json.decode(str)); - -String agentAttributesToJson(AgentAttributes data) => json.encode(data.toJson()); - -TranscriptionAttributes transcriptionAttributesFromJson(String str) => - TranscriptionAttributes.fromJson(json.decode(str)); - -String transcriptionAttributesToJson(TranscriptionAttributes data) => json.encode(data.toJson()); - -@JsonSerializable() -class AgentAttributes { - @JsonKey(name: 'lk.agent.inputs') - List? lkAgentInputs; - - @JsonKey(name: 'lk.agent.outputs') - List? lkAgentOutputs; - - @JsonKey(name: 'lk.agent.state') - AgentState? lkAgentState; - - @JsonKey(name: 'lk.publish_on_behalf') - String? lkPublishOnBehalf; - - AgentAttributes({ - this.lkAgentInputs, - this.lkAgentOutputs, - this.lkAgentState, - this.lkPublishOnBehalf, - }); - - factory AgentAttributes.fromJson(Map json) => _$AgentAttributesFromJson(json); - Map toJson() => _$AgentAttributesToJson(this); -} - -@JsonEnum() -enum AgentInput { - @JsonValue('audio') - audio, - @JsonValue('text') - text, - @JsonValue('video') - video, -} - -@JsonEnum() -enum AgentOutput { - @JsonValue('audio') - audio, - @JsonValue('transcription') - transcription, -} - -@JsonEnum() -enum AgentState { - @JsonValue('idle') - idle, - @JsonValue('initializing') - initializing, - @JsonValue('listening') - listening, - @JsonValue('speaking') - speaking, - @JsonValue('thinking') - thinking, -} - -///Schema for transcription-related attributes -@JsonSerializable() -class TranscriptionAttributes { - ///The segment id of the transcription - @JsonKey(name: 'lk.segment_id') - String? lkSegmentId; - - ///The associated track id of the transcription - @JsonKey(name: 'lk.transcribed_track_id') - String? lkTranscribedTrackId; - - ///Whether the transcription is final - @JsonKey(name: 'lk.transcription_final') - bool? lkTranscriptionFinal; - - TranscriptionAttributes({ - this.lkSegmentId, - this.lkTranscribedTrackId, - this.lkTranscriptionFinal, - }); - - factory TranscriptionAttributes.fromJson(Map json) => _$TranscriptionAttributesFromJson(json); - Map toJson() => _$TranscriptionAttributesToJson(this); -} diff --git a/test/token/token_source_test.dart b/test/token/token_source_test.dart index 40f684d1..1fd620a6 100644 --- a/test/token/token_source_test.dart +++ b/test/token/token_source_test.dart @@ -174,6 +174,14 @@ void main() { 'hidden': false, 'recorder': true, }, + roomConfig: { + 'agents': [ + { + 'agent_name': 'demo-agent', + 'metadata': '{"foo":"bar"}', + } + ] + }, ); final response = TokenSourceResponse( @@ -201,6 +209,42 @@ void main() { expect(grant.canPublishSources, ['camera', 'screen']); expect(grant.hidden, isFalse); expect(grant.recorder, isTrue); + + final config = payload.roomConfiguration; + expect(config, isNotNull); + expect(config!.agents, isNotNull); + expect(config.agents, hasLength(1)); + expect(config.agents!.first.agentName, 'demo-agent'); + expect(config.agents!.first.metadata, '{"foo":"bar"}'); + }); + }); + + group('TokenSourceResponse', () { + test('dispatchesAgent returns true when JWT config includes agents', () { + final token = _generateToken( + roomConfig: { + 'agents': [ + {'agent_name': 'assistant'} + ] + }, + ); + + final response = TokenSourceResponse( + serverUrl: 'https://test.livekit.io', + participantToken: token, + ); + + expect(response.dispatchesAgent(), isTrue); + }); + + test('dispatchesAgent returns false when JWT lacks agents', () { + final token = _generateToken(); + final response = TokenSourceResponse( + serverUrl: 'https://test.livekit.io', + participantToken: token, + ); + + expect(response.dispatchesAgent(), isFalse); }); }); @@ -445,6 +489,7 @@ String _generateToken({ String? metadata, Map? attributes, Map? video, + Map? roomConfig, }) { final payload = { 'sub': subject ?? 'test-participant', @@ -483,6 +528,10 @@ String _generateToken({ payload['attributes'] = Map.from(attributes); } + if (roomConfig != null) { + payload['roomConfig'] = roomConfig; + } + final jwt = JWT(payload); return jwt.sign(SecretKey('test-secret')); }