From a08c694955b9188426bfb935ba7963d5bfa21a8f Mon Sep 17 00:00:00 2001 From: jake-luster Date: Tue, 12 Aug 2025 19:52:25 -0500 Subject: [PATCH] feat: Introduce WebSocket Reconnect Behavior --- packages/client/src/utils/BaseConnection.ts | 7 + .../client/src/utils/WebSocketConnection.ts | 135 ++++++++++++++---- 2 files changed, 113 insertions(+), 29 deletions(-) diff --git a/packages/client/src/utils/BaseConnection.ts b/packages/client/src/utils/BaseConnection.ts index 50f995b2..fa204222 100644 --- a/packages/client/src/utils/BaseConnection.ts +++ b/packages/client/src/utils/BaseConnection.ts @@ -47,6 +47,12 @@ export type FormatConfig = { sampleRate: number; }; +export type WebSocketConnectionConfig = { + reconnect: boolean; + maxReconnectAttempts?: number; + reconnectDelayMs?: number; +} + export type DisconnectionDetails = | { reason: "error"; @@ -93,6 +99,7 @@ export type BaseSessionConfig = { connectionDelay?: DelayConfig; textOnly?: boolean; userId?: string; + webSocketConnectionConfig?: WebSocketConnectionConfig; }; export type ConnectionType = "websocket" | "webrtc"; diff --git a/packages/client/src/utils/WebSocketConnection.ts b/packages/client/src/utils/WebSocketConnection.ts index 9179afc0..9f0c8bf9 100644 --- a/packages/client/src/utils/WebSocketConnection.ts +++ b/packages/client/src/utils/WebSocketConnection.ts @@ -3,6 +3,8 @@ import { type SessionConfig, type FormatConfig, parseFormat, + PublicSessionConfig, + PrivateWebSocketSessionConfig, } from "./BaseConnection"; import { PACKAGE_VERSION } from "../version"; import { @@ -15,39 +17,70 @@ import { constructOverrides } from "./overrides"; const MAIN_PROTOCOL = "convai"; const WSS_API_ORIGIN = "wss://api.elevenlabs.io"; const WSS_API_PATHNAME = "/v1/convai/conversation?agent_id="; +const RECONNECT_DELAY_MS = 1000; +const MAX_RECONNECT_ATTEMPTS = 5; export class WebSocketConnection extends BaseConnection { public readonly conversationId: string; public readonly inputFormat: FormatConfig; public readonly outputFormat: FormatConfig; + private readonly sessionConfig: SessionConfig; + + private reconnectAttempts = 0; + private intentionalClose = false; + + private handleErrorListener = this.handleOnError.bind(this); + private handleCloseListener = this.handleOnClose.bind(this); + private handleMessageListener = this.handleOnMessage.bind(this); private constructor( - private readonly socket: WebSocket, + private socket: WebSocket, conversationId: string, inputFormat: FormatConfig, - outputFormat: FormatConfig + outputFormat: FormatConfig, + sessionConfig: SessionConfig ) { super(); this.conversationId = conversationId; this.inputFormat = inputFormat; this.outputFormat = outputFormat; + this.sessionConfig = sessionConfig; - this.socket.addEventListener("error", event => { - // In case the error event is followed by a close event, we want the - // latter to be the one that disconnects the session as it contains more - // useful information. - setTimeout( - () => - this.disconnect({ - reason: "error", - message: "The connection was closed due to a socket error.", - context: event, - }), - 0 - ); - }); + this.setupSocketListeners(socket); + } + + private setupSocketListeners(socket: WebSocket) { + socket.addEventListener("error", this.handleErrorListener); + socket.addEventListener("close", this.handleCloseListener); + socket.addEventListener("message", this.handleMessageListener); + } + + private removeSocketListeners() { + this.socket.removeEventListener("error", this.handleErrorListener); + this.socket.removeEventListener("close", this.handleCloseListener); + this.socket.removeEventListener("message", this.handleMessageListener); + } + + private handleOnError(event: Event) { + // In case the error event is followed by a close event, we want the + // latter to be the one that disconnects the session as it contains more + // useful information. + setTimeout( + () => + this.disconnect({ + reason: "error", + message: "The connection was closed due to a socket error.", + context: event, + }), + 0 + ); + } - this.socket.addEventListener("close", event => { + private handleOnClose(event: CloseEvent) { + const reconnectConfig = this.sessionConfig.webSocketConnectionConfig; + if (!this.intentionalClose && reconnectConfig?.reconnect) { + this.tryReconnect(event); + } else { this.disconnect( event.code === 1000 ? { @@ -61,17 +94,17 @@ export class WebSocketConnection extends BaseConnection { context: event, } ); - }); - - this.socket.addEventListener("message", event => { - try { - const parsedEvent = JSON.parse(event.data); - if (!isValidSocketEvent(parsedEvent)) { - return; - } - this.handleMessage(parsedEvent); - } catch (_) {} - }); + } + } + + private handleOnMessage(event: MessageEvent) { + try { + const parsedEvent = JSON.parse(event.data); + if (!isValidSocketEvent(parsedEvent)) { + return; + } + this.handleMessage(parsedEvent); + } catch (_) {} } public static async create( @@ -155,7 +188,8 @@ export class WebSocketConnection extends BaseConnection { socket, conversation_id, inputFormat, - outputFormat + outputFormat, + config ); } catch (error) { socket?.close(); @@ -163,7 +197,50 @@ export class WebSocketConnection extends BaseConnection { } } + private async tryReconnect(event: CloseEvent) { + const reconnectConfig = this.sessionConfig.webSocketConnectionConfig; + + if (!reconnectConfig?.reconnect) { + this.intentionalClose = true; + this.handleOnClose(event); + return; + } + + const maxReconnectAttempts = + reconnectConfig?.maxReconnectAttempts ?? MAX_RECONNECT_ATTEMPTS; + + const reconnectDelayMs = + reconnectConfig?.reconnectDelayMs ?? RECONNECT_DELAY_MS; + + if (this.reconnectAttempts >= maxReconnectAttempts) { + this.intentionalClose = true; + this.handleOnClose(event); + return; + } + + this.reconnectAttempts++; + const delay = reconnectDelayMs * this.reconnectAttempts; + + await new Promise(resolve => setTimeout(resolve, delay)); + + try { + const newConnection = await WebSocketConnection.create(this.sessionConfig); + this.replaceSocket(newConnection.socket); + this.reconnectAttempts = 0; + } catch (e) { + this.tryReconnect(event); + } + } + + private replaceSocket(newSocket: WebSocket) { + this.removeSocketListeners(); + this.socket.close(); + this.socket = newSocket; + this.setupSocketListeners(newSocket); + } + public close() { + this.intentionalClose = true; this.socket.close(); }