Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/client/src/utils/BaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ export type FormatConfig = {
sampleRate: number;
};

export type WebSocketConnectionConfig = {
reconnect: boolean;
maxReconnectAttempts?: number;
reconnectDelayMs?: number;
}

export type DisconnectionDetails =
| {
reason: "error";
Expand Down Expand Up @@ -93,6 +99,7 @@ export type BaseSessionConfig = {
connectionDelay?: DelayConfig;
textOnly?: boolean;
userId?: string;
webSocketConnectionConfig?: WebSocketConnectionConfig;
};

export type ConnectionType = "websocket" | "webrtc";
Expand Down
135 changes: 106 additions & 29 deletions packages/client/src/utils/WebSocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {
type SessionConfig,
type FormatConfig,
parseFormat,
PublicSessionConfig,
PrivateWebSocketSessionConfig,
} from "./BaseConnection";
import { PACKAGE_VERSION } from "../version";
import {
Expand All @@ -15,39 +17,70 @@ import { constructOverrides } from "./overrides";
const MAIN_PROTOCOL = "convai";
const WSS_API_ORIGIN = "wss://api.elevenlabs.io";
const WSS_API_PATHNAME = "/v1/convai/conversation?agent_id=";
const RECONNECT_DELAY_MS = 1000;
const MAX_RECONNECT_ATTEMPTS = 5;

export class WebSocketConnection extends BaseConnection {
public readonly conversationId: string;
public readonly inputFormat: FormatConfig;
public readonly outputFormat: FormatConfig;
private readonly sessionConfig: SessionConfig;

private reconnectAttempts = 0;
private intentionalClose = false;

private handleErrorListener = this.handleOnError.bind(this);
private handleCloseListener = this.handleOnClose.bind(this);
private handleMessageListener = this.handleOnMessage.bind(this);

private constructor(
private readonly socket: WebSocket,
private socket: WebSocket,
conversationId: string,
inputFormat: FormatConfig,
outputFormat: FormatConfig
outputFormat: FormatConfig,
sessionConfig: SessionConfig
) {
super();
this.conversationId = conversationId;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.sessionConfig = sessionConfig;

this.socket.addEventListener("error", event => {
// In case the error event is followed by a close event, we want the
// latter to be the one that disconnects the session as it contains more
// useful information.
setTimeout(
() =>
this.disconnect({
reason: "error",
message: "The connection was closed due to a socket error.",
context: event,
}),
0
);
});
this.setupSocketListeners(socket);
}

private setupSocketListeners(socket: WebSocket) {
socket.addEventListener("error", this.handleErrorListener);
socket.addEventListener("close", this.handleCloseListener);
socket.addEventListener("message", this.handleMessageListener);
}

private removeSocketListeners() {
this.socket.removeEventListener("error", this.handleErrorListener);
this.socket.removeEventListener("close", this.handleCloseListener);
this.socket.removeEventListener("message", this.handleMessageListener);
}

private handleOnError(event: Event) {
// In case the error event is followed by a close event, we want the
// latter to be the one that disconnects the session as it contains more
// useful information.
setTimeout(
() =>
this.disconnect({
reason: "error",
message: "The connection was closed due to a socket error.",
context: event,
}),
0
);
}

this.socket.addEventListener("close", event => {
private handleOnClose(event: CloseEvent) {
const reconnectConfig = this.sessionConfig.webSocketConnectionConfig;
if (!this.intentionalClose && reconnectConfig?.reconnect) {
this.tryReconnect(event);
} else {
this.disconnect(
event.code === 1000
? {
Expand All @@ -61,17 +94,17 @@ export class WebSocketConnection extends BaseConnection {
context: event,
}
);
});

this.socket.addEventListener("message", event => {
try {
const parsedEvent = JSON.parse(event.data);
if (!isValidSocketEvent(parsedEvent)) {
return;
}
this.handleMessage(parsedEvent);
} catch (_) {}
});
}
}

private handleOnMessage(event: MessageEvent) {
try {
const parsedEvent = JSON.parse(event.data);
if (!isValidSocketEvent(parsedEvent)) {
return;
}
this.handleMessage(parsedEvent);
} catch (_) {}
}

public static async create(
Expand Down Expand Up @@ -155,15 +188,59 @@ export class WebSocketConnection extends BaseConnection {
socket,
conversation_id,
inputFormat,
outputFormat
outputFormat,
config
);
} catch (error) {
socket?.close();
throw error;
}
}

private async tryReconnect(event: CloseEvent) {
const reconnectConfig = this.sessionConfig.webSocketConnectionConfig;

if (!reconnectConfig?.reconnect) {
this.intentionalClose = true;
this.handleOnClose(event);
return;
}

const maxReconnectAttempts =
reconnectConfig?.maxReconnectAttempts ?? MAX_RECONNECT_ATTEMPTS;

const reconnectDelayMs =
reconnectConfig?.reconnectDelayMs ?? RECONNECT_DELAY_MS;

if (this.reconnectAttempts >= maxReconnectAttempts) {
this.intentionalClose = true;
this.handleOnClose(event);
return;
}

this.reconnectAttempts++;
const delay = reconnectDelayMs * this.reconnectAttempts;

await new Promise(resolve => setTimeout(resolve, delay));

try {
const newConnection = await WebSocketConnection.create(this.sessionConfig);
this.replaceSocket(newConnection.socket);
this.reconnectAttempts = 0;
} catch (e) {
this.tryReconnect(event);
}
}

private replaceSocket(newSocket: WebSocket) {
this.removeSocketListeners();
this.socket.close();
this.socket = newSocket;
this.setupSocketListeners(newSocket);
}

public close() {
this.intentionalClose = true;
this.socket.close();
}

Expand Down
Loading