From 6ccc6d4dfe3c81a69a1bbb846347a53f65d9e092 Mon Sep 17 00:00:00 2001 From: Aditya Hegde Date: Tue, 30 Dec 2025 11:09:39 +0530 Subject: [PATCH 1/4] chore: unify event emitter --- .../chat/core/conversation-manager.ts | 55 ++++++------- .../src/features/chat/core/conversation.ts | 54 +++++-------- .../features/chat/core/input/ChatInput.svelte | 12 ++- .../file-and-resource-watcher.ts | 2 +- .../sample-data/generate-sample-data.ts | 12 ++- web-common/src/lib/actions/modified-click.ts | 2 +- web-common/src/lib/event-bus/event-bus.ts | 77 ++----------------- web-common/src/lib/event-emitter.ts | 54 +++++++++++++ .../runtime-client/sse-connection-manager.ts | 44 +++-------- .../src/runtime-client/sse-fetch-client.ts | 67 ++++------------ 10 files changed, 157 insertions(+), 222 deletions(-) create mode 100644 web-common/src/lib/event-emitter.ts diff --git a/web-common/src/features/chat/core/conversation-manager.ts b/web-common/src/features/chat/core/conversation-manager.ts index d69fee84178..e654c4cb4eb 100644 --- a/web-common/src/features/chat/core/conversation-manager.ts +++ b/web-common/src/features/chat/core/conversation-manager.ts @@ -46,26 +46,17 @@ export class ConversationManager { private static readonly MAX_CONCURRENT_STREAMS = 3; private newConversation: Conversation; + private newConversationUnsub: (() => void) | null = null; private conversations = new Map(); private conversationSelector: ConversationSelector; - private agent?: string; + private readonly agent?: string; constructor( public readonly instanceId: string, options: ConversationManagerOptions, ) { this.agent = options.agent; - this.newConversation = new Conversation( - this.instanceId, - NEW_CONVERSATION_ID, - { - agent: this.agent, - onStreamStart: () => this.enforceMaxConcurrentStreams(), - onConversationCreated: (conversationId: string) => { - this.handleConversationCreated(conversationId); - }, - }, - ); + this.createNewConversation(); switch (options.conversationState) { case "url": @@ -121,10 +112,10 @@ export class ConversationManager { const conversation = new Conversation( this.instanceId, $conversationId, - { - agent: this.agent, - onStreamStart: () => this.enforceMaxConcurrentStreams(), - }, + this.agent, + ); + conversation.on("stream-start", () => + this.enforceMaxConcurrentStreams(), ); this.conversations.set($conversationId, conversation); return conversation; @@ -162,6 +153,26 @@ export class ConversationManager { // ===== PRIVATE IMPLEMENTATION ===== + private createNewConversation() { + this.newConversationUnsub?.(); + this.newConversation = new Conversation( + this.instanceId, + NEW_CONVERSATION_ID, + this.agent, + ); + const streamStartUnsub = this.newConversation.on("stream-start", () => + this.enforceMaxConcurrentStreams(), + ); + const conversationStartedUnsub = this.newConversation.on( + "conversation-created", + (conversationId) => this.handleConversationCreated(conversationId), + ); + this.newConversationUnsub = () => { + streamStartUnsub(); + conversationStartedUnsub(); + }; + } + // ----- Stream Management ----- /** @@ -221,17 +232,7 @@ export class ConversationManager { this.conversations.set(conversationId, this.newConversation); // Create a fresh "new" conversation instance - this.newConversation = new Conversation( - this.instanceId, - NEW_CONVERSATION_ID, - { - agent: this.agent, - onStreamStart: () => this.enforceMaxConcurrentStreams(), - onConversationCreated: (conversationId: string) => { - this.handleConversationCreated(conversationId); - }, - }, - ); + this.createNewConversation(); } } diff --git a/web-common/src/features/chat/core/conversation.ts b/web-common/src/features/chat/core/conversation.ts index 388a36611c1..4cb8c64ad47 100644 --- a/web-common/src/features/chat/core/conversation.ts +++ b/web-common/src/features/chat/core/conversation.ts @@ -24,6 +24,15 @@ import { invalidateConversationsList, NEW_CONVERSATION_ID, } from "./utils"; +import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts"; + +type ConversionEvents = { + "conversation-created": string; + "stream-start": void; + message: V1Message; + "stream-complete": string; + error: string; +}; /** * Individual conversation state management. @@ -31,7 +40,7 @@ import { * Handles streaming message sending, optimistic updates, and conversation-specific queries * for a single conversation using the streaming completion endpoint. */ -export class Conversation { +export class Conversation extends EventEmitter { // Public reactive state public readonly draftMessage = writable(""); public readonly isStreaming = writable(false); @@ -44,17 +53,9 @@ export class Conversation { constructor( private readonly instanceId: string, public conversationId: string, - private readonly options: { - agent?: string; - onStreamStart?: () => void; - onConversationCreated?: (conversationId: string) => void; - } = { - agent: ToolName.ANALYST_AGENT, // Hardcoded default for now - }, + private readonly agent: string = ToolName.ANALYST_AGENT, // Hardcoded default for now ) { - if (this.options) { - this.options.agent ??= ToolName.ANALYST_AGENT; - } + super(); } // ===== PUBLIC API ===== @@ -114,15 +115,7 @@ export class Conversation { * @param context - Chat context to be sent with the message * @param options - Callback functions for different stages of message sending */ - public async sendMessage( - context: RuntimeServiceCompleteBody, - options?: { - onStreamStart?: () => void; - onMessage?: (message: V1Message) => void; - onStreamComplete?: (conversationId: string) => void; - onError?: (error: string) => void; - }, - ): Promise { + public async sendMessage(context: RuntimeServiceCompleteBody): Promise { // Prevent concurrent message sending if (get(this.isStreaming)) { this.streamError.set("Please wait for the current response to complete"); @@ -141,19 +134,15 @@ export class Conversation { const userMessage = this.addOptimisticUserMessage(prompt); try { - options?.onStreamStart?.(); + this.emit("stream-start"); // Start streaming - this establishes the connection - const streamPromise = this.startStreaming( - prompt, - context, - options?.onMessage, - ); + const streamPromise = this.startStreaming(prompt, context); // Wait for streaming to complete await streamPromise; // Stream has completed successfully - options?.onStreamComplete?.(this.conversationId); + this.emit("stream-complete", this.conversationId); // Temporary fix to make sure the title of the conversation is updated. void invalidateConversationsList(this.instanceId); @@ -171,7 +160,7 @@ export class Conversation { userMessage, this.hasReceivedFirstMessage, ); - options?.onError?.(this.formatTransportError(error)); + this.emit("error", this.formatTransportError(error)); } finally { this.isStreaming.set(false); } @@ -213,7 +202,6 @@ export class Conversation { private async startStreaming( prompt: string, context: RuntimeServiceCompleteBody | undefined, - onMessage: ((message: V1Message) => void) | undefined, ): Promise { // Initialize SSE client if not already done if (!this.sseClient) { @@ -238,7 +226,7 @@ export class Conversation { message.data, ); this.processStreamingResponse(response); - if (response.message) onMessage?.(response.message); + if (response.message) this.emit("message", response.message); } catch (error) { console.error("Failed to parse streaming response:", error); this.streamError.set("Failed to process server response"); @@ -276,12 +264,12 @@ export class Conversation { ? undefined : this.conversationId, prompt, - agent: this.options?.agent, + agent: this.agent, ...context, }; // Notify that streaming is about to start (for concurrent stream management) - this.options?.onStreamStart?.(); + this.emit("stream-start"); // Start streaming - this will establish the connection and then stream until completion await this.sseClient.start(baseUrl, { @@ -360,7 +348,7 @@ export class Conversation { this.conversationId = realConversationId; // Notify that conversation was created - this.options?.onConversationCreated?.(realConversationId); + this.emit("conversation-created", realConversationId); } // ----- Cache Management ----- diff --git a/web-common/src/features/chat/core/input/ChatInput.svelte b/web-common/src/features/chat/core/input/ChatInput.svelte index 365d6396124..5b1f3d7e972 100644 --- a/web-common/src/features/chat/core/input/ChatInput.svelte +++ b/web-common/src/features/chat/core/input/ChatInput.svelte @@ -27,6 +27,14 @@ $: draftMessageStore = currentConversation.draftMessage; $: isStreamingStore = currentConversation.isStreaming; + let streamStartUnsub: (() => void) | undefined = undefined; + $: { + streamStartUnsub?.(); + streamStartUnsub = currentConversation.on("stream-start", () => { + editor.commands.setContent(""); + }); + } + $: value = $draftMessageStore; $: disabled = $getConversationQuery?.isLoading || $isStreamingStore; $: canSend = !disabled && value.trim(); @@ -40,9 +48,7 @@ // Message handling with input focus try { - await currentConversation.sendMessage($additionalContextStore, { - onStreamStart: () => editor.commands.setContent(""), - }); + await currentConversation.sendMessage($additionalContextStore); onSend?.(); } catch (error) { console.error("Failed to send message:", error); diff --git a/web-common/src/features/entity-management/file-and-resource-watcher.ts b/web-common/src/features/entity-management/file-and-resource-watcher.ts index 3b175d6acc2..a85f8291102 100644 --- a/web-common/src/features/entity-management/file-and-resource-watcher.ts +++ b/web-common/src/features/entity-management/file-and-resource-watcher.ts @@ -177,7 +177,7 @@ export class FileAndResourceWatcher { await invalidate("init"); - eventBus.emit("rill-yaml-updated", null); + eventBus.emit("rill-yaml-updated"); } this.seenFiles.add(res.path); break; diff --git a/web-common/src/features/sample-data/generate-sample-data.ts b/web-common/src/features/sample-data/generate-sample-data.ts index d5fc4aa243f..a19346b25a8 100644 --- a/web-common/src/features/sample-data/generate-sample-data.ts +++ b/web-common/src/features/sample-data/generate-sample-data.ts @@ -62,9 +62,11 @@ export async function generateSampleData( }, }, }); - const conversation = new Conversation(instanceId, NEW_CONVERSATION_ID, { - agent: ToolName.DEVELOPER_AGENT, - }); + const conversation = new Conversation( + instanceId, + NEW_CONVERSATION_ID, + ToolName.DEVELOPER_AGENT, + ); const agentPrompt = `Generate a NEW model with fresh data for the following user prompt: ${userPrompt}`; conversation.draftMessage.set(agentPrompt); @@ -124,15 +126,17 @@ export async function generateSampleData( } } }; + const handleMessageUnsub = conversation.on("message", handleMessage); let cancelled = false; conversation.cancelStream(); - await conversation.sendMessage({}, { onMessage: handleMessage }); + await conversation.sendMessage({}); await waitUntil(() => !get(conversation.isStreaming)); + handleMessageUnsub(); overlay.set(null); if (cancelled) return; if (!created) { diff --git a/web-common/src/lib/actions/modified-click.ts b/web-common/src/lib/actions/modified-click.ts index 0bfd30f61b1..bcca21d58e2 100644 --- a/web-common/src/lib/actions/modified-click.ts +++ b/web-common/src/lib/actions/modified-click.ts @@ -28,7 +28,7 @@ export function modified(params: Params) { } const event = (modifier ? `${modifier}-click` : "click") as keyof Events; - eventBus.emit(event, null); + eventBus.emit(event); if (handler) { await handler(e); diff --git a/web-common/src/lib/event-bus/event-bus.ts b/web-common/src/lib/event-bus/event-bus.ts index 7cc69393ab6..b59e51ab023 100644 --- a/web-common/src/lib/event-bus/event-bus.ts +++ b/web-common/src/lib/event-bus/event-bus.ts @@ -3,83 +3,20 @@ import type { PageContentResized, NotificationMessage, } from "./events"; - -class EventBus { - private listeners: EventMap = new Map(); - - on(event: Event, callback: Listener) { - const key = generateUUID(); - const eventMap = this.listeners.get(event); - - if (!eventMap) { - this.listeners.set( - event, - new Map>([[key, callback]]), - ); - } else { - eventMap.set(key, callback); - } - - const unsubscribe = () => this.listeners.get(event)?.delete(key); - - return unsubscribe; - } - - once(event: Event, callback: Listener) { - const unsubscribe = this.on(event, (payload) => { - callback(payload); - unsubscribe(); - }); - - return unsubscribe; - } - - emit(event: Event, payload: Events[Event]) { - const listeners = this.listeners.get(event); - - listeners?.forEach((cb) => { - cb(payload); - }); - } -} - -function generateUUID(): string { - // Generate random numbers for the UUID - const randomNumbers: number[] = new Array(16) - .fill(0) - .map(() => Math.floor(Math.random() * 256)); - - // Set the version and variant bits - randomNumbers[6] = (randomNumbers[6] & 0x0f) | 0x40; // Version 4 - randomNumbers[8] = (randomNumbers[8] & 0x3f) | 0x80; // Variant 10 - - // Convert to hexadecimal and format as a UUID - const hexDigits: string = randomNumbers - .map((b) => b.toString(16).padStart(2, "0")) - .join(""); - return `${hexDigits.slice(0, 8)}-${hexDigits.slice(8, 12)}-${hexDigits.slice(12, 16)}-${hexDigits.slice(16, 20)}-${hexDigits.slice(20, 32)}`; -} - -export const eventBus = new EventBus(); +import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts"; export interface Events { notification: NotificationMessage; "clear-all-notifications": void; "add-banner": BannerEvent; "remove-banner": string; - "shift-click": null; - "command-click": null; - click: null; - "shift-command-click": null; + "shift-click": void; + "command-click": void; + click: void; + "shift-command-click": void; "page-content-resized": PageContentResized; "start-chat": string; - "rill-yaml-updated": null; + "rill-yaml-updated": void; } -type T = keyof Events; - -type Listener = (e: Events[EventType]) => void; - -type EventMap = Map; - -type Listeners = Map>; +export const eventBus = new EventEmitter(); diff --git a/web-common/src/lib/event-emitter.ts b/web-common/src/lib/event-emitter.ts new file mode 100644 index 00000000000..45a661047be --- /dev/null +++ b/web-common/src/lib/event-emitter.ts @@ -0,0 +1,54 @@ +import { v4 as uuidv4 } from "uuid"; + +type VoidType = void | Promise; +type Listener< + Events extends Record, + E extends keyof Events, +> = Events[E] extends void ? () => VoidType : (arg: Events[E]) => VoidType; +type Args< + Events extends Record, + E extends keyof Events, +> = Events[E] extends void ? [] : [Events[E]]; + +export class EventEmitter> { + private readonly listeners = new Map< + keyof Events, + Map> + >(); + + public on(event: E, listener: Listener) { + const key = uuidv4(); + const eventMap = this.listeners.get(event); + + if (!eventMap) { + this.listeners.set(event, new Map([[key, listener]])); + } else { + eventMap.set(key, listener); + } + + const unsubscribe = () => this.listeners.get(event)?.delete(key); + + return unsubscribe; + } + + once(event: E, listener: Listener) { + const unsubscribe = this.on(event, ((...args: Args) => { + (listener as any)(...args); + unsubscribe(); + }) as Listener); + + return unsubscribe; + } + + public emit(event: E, ...args: Args) { + const listeners = this.listeners.get(event); + + listeners?.forEach((listener) => { + (listener as any)(...args); + }); + } + + public clearListeners() { + this.listeners.clear(); + } +} diff --git a/web-common/src/runtime-client/sse-connection-manager.ts b/web-common/src/runtime-client/sse-connection-manager.ts index e0d34404bac..5b590077d86 100644 --- a/web-common/src/runtime-client/sse-connection-manager.ts +++ b/web-common/src/runtime-client/sse-connection-manager.ts @@ -2,6 +2,7 @@ import { get, writable } from "svelte/store"; import { Throttler } from "../lib/throttler"; import { asyncWait } from "../lib/waitUtils"; import { SSEFetchClient, type SSEMessage } from "./sse-fetch-client"; +import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts"; const BACKOFF_DELAY = 1000; // Base delay in ms @@ -22,26 +23,20 @@ export enum ConnectionStatus { CLOSED = "closed", } -type EventMap = { - message: T; +type SSEConnectionManagerEvents = { + message: SSEMessage; reconnect: void; error: Error; close: void; open: void; }; -type Listeners = Record, Callback>[]>; - -type Callback> = ( - eventData: EventMap[K], -) => void | Promise; - // ===== SSE CONNECTION MANAGER ===== /** * A wrapper around SSEFetchClient to manage status and reconnections */ -export class SSEConnectionManager { +export class SSEConnectionManager extends EventEmitter { public status = writable(ConnectionStatus.CLOSED); public url: string; @@ -52,13 +47,6 @@ export class SSEConnectionManager { }; private client = new SSEFetchClient(); - private listeners: Listeners = { - message: [], - reconnect: [], - error: [], - close: [], - open: [], - }; private autoCloseThrottler: Throttler | undefined; private retryAttempts = writable(0); @@ -66,6 +54,8 @@ export class SSEConnectionManager { private connectionCount = 0; constructor(public params?: Params) { + super(); + if (params?.autoCloseTimeouts) { this.autoCloseThrottler = new Throttler( params.autoCloseTimeouts.normal, @@ -79,13 +69,6 @@ export class SSEConnectionManager { this.client.on("open", this.handleSuccessfulConnection); } - public on>( - event: K, - listener: Callback, - ) { - this.listeners[event].push(listener); - } - /** * Handle reconnection with exponential backoff */ @@ -172,9 +155,8 @@ export class SSEConnectionManager { void this.reconnect(); } - this.listeners.error.forEach((listener) => - listener(error instanceof Error ? error : new Error(String(error))), - ); + const errorArg = error instanceof Error ? error : new Error(String(error)); + this.emit("error", errorArg); }; // This can happen in one of three situations: @@ -192,12 +174,12 @@ export class SSEConnectionManager { } else { this.close(); - this.listeners.close.forEach((listener) => listener()); + this.emit("close"); } }; private handleMessage = (message: SSEMessage) => { - this.listeners.message.forEach((listener) => listener(message)); + this.emit("message", message); }; private handleSuccessfulConnection = () => { @@ -207,7 +189,7 @@ export class SSEConnectionManager { this.retryAttempts.set(0); if (this.connectionCount > 1) { - this.listeners.reconnect.forEach((cb) => void cb()); + this.emit("reconnect"); } }; @@ -264,8 +246,6 @@ export class SSEConnectionManager { this.pause(); // Clear all event listeners - this.listeners.message = []; - this.listeners.error = []; - this.listeners.close = []; + this.clearListeners(); } } diff --git a/web-common/src/runtime-client/sse-fetch-client.ts b/web-common/src/runtime-client/sse-fetch-client.ts index 785bc87fba5..206cc4f15a3 100644 --- a/web-common/src/runtime-client/sse-fetch-client.ts +++ b/web-common/src/runtime-client/sse-fetch-client.ts @@ -1,5 +1,6 @@ import { runtime } from "@rilldata/web-common/runtime-client/runtime-store"; import { get } from "svelte/store"; +import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts"; /** * Represents a Server-Sent Event message @@ -78,6 +79,13 @@ function isValidEvent(event: Partial): event is SSEMessage { return event.data !== undefined && event.data !== ""; } +type SSEFetchClientEvents = { + message: SSEMessage; + error: Error; + close: void; + open: void; +}; + // ===== SSE FETCH CLIENT ===== /** @@ -87,49 +95,8 @@ function isValidEvent(event: Partial): event is SSEMessage { * interpret the semantic meaning of events. Consumers decide how to handle * different event types and data formats. */ -export class SSEFetchClient { +export class SSEFetchClient extends EventEmitter { private abortController: AbortController | undefined; - private listeners: { - message: ((message: SSEMessage) => void)[]; - error: ((error: Error) => void)[]; - close: (() => void)[]; - open: (() => void)[]; - } = { - message: [], - error: [], - close: [], - open: [], - }; - - /** - * Add event listener for SSE events - */ - public on(event: "message", listener: (message: SSEMessage) => void): void; - public on(event: "error", listener: (error: Error) => void): void; - public on(event: "close", listener: () => void): void; - public on(event: "open", listener: () => void): void; - public on(event: string, listener: any): void { - if (this.listeners[event as keyof typeof this.listeners]) { - this.listeners[event as keyof typeof this.listeners].push(listener); - } - } - - /** - * Remove event listener - */ - public off(event: "message", listener: (message: SSEMessage) => void): void; - public off(event: "error", listener: (error: Error) => void): void; - public off(event: "close", listener: () => void): void; - public off(event: "open", listener: () => void): void; - public off(event: string, listener: any): void { - const eventListeners = this.listeners[event as keyof typeof this.listeners]; - if (eventListeners) { - const index = eventListeners.indexOf(listener); - if (index > -1) { - eventListeners.splice(index, 1); - } - } - } /** * Start streaming from the given URL @@ -181,19 +148,19 @@ export class SSEFetchClient { throw new Error("No response body"); } - this.listeners.open.forEach((listener) => listener()); + this.emit("open"); // Process the SSE stream await this.processSSEStream(response.body); } catch (error) { if (error.name !== "AbortError") { - this.listeners.error.forEach((listener) => - listener(error instanceof Error ? error : new Error(String(error))), - ); + const errorArg = + error instanceof Error ? error : new Error(String(error)); + this.emit("error", errorArg); } } finally { this.stop(); - this.listeners.close.forEach((listener) => listener()); + this.emit("close"); } } @@ -215,9 +182,7 @@ export class SSEFetchClient { this.stop(); // Clear all event listeners - this.listeners.message = []; - this.listeners.error = []; - this.listeners.close = []; + this.clearListeners(); } /** @@ -278,6 +243,6 @@ export class SSEFetchClient { * Emit a message to all registered listeners */ private emitMessage(message: SSEMessage): void { - this.listeners.message.forEach((listener) => listener(message)); + this.emit("message", message); } } From d718c730fbb2f9d8049be03bf23257ed4e29918f Mon Sep 17 00:00:00 2001 From: Aditya Hegde Date: Tue, 30 Dec 2025 15:22:14 +0530 Subject: [PATCH 2/4] Tweaks --- web-common/src/features/chat/core/conversation.ts | 2 ++ web-common/src/features/chat/core/input/ChatInput.svelte | 1 + 2 files changed, 3 insertions(+) diff --git a/web-common/src/features/chat/core/conversation.ts b/web-common/src/features/chat/core/conversation.ts index 4cb8c64ad47..aa849a609d0 100644 --- a/web-common/src/features/chat/core/conversation.ts +++ b/web-common/src/features/chat/core/conversation.ts @@ -189,6 +189,8 @@ export class Conversation extends EventEmitter { this.sseClient.cleanup(); this.sseClient = null; } + + this.clearListeners(); } // ===== PRIVATE IMPLEMENTATION ===== diff --git a/web-common/src/features/chat/core/input/ChatInput.svelte b/web-common/src/features/chat/core/input/ChatInput.svelte index 5b1f3d7e972..eb4164871ac 100644 --- a/web-common/src/features/chat/core/input/ChatInput.svelte +++ b/web-common/src/features/chat/core/input/ChatInput.svelte @@ -114,6 +114,7 @@ chatMounted.set(false); editor.destroy(); unsubStartChatEvent(); + streamStartUnsub?.(); }; }); From 3375365041df9cdac408a65b574e98579184a6de Mon Sep 17 00:00:00 2001 From: Aditya Hegde Date: Tue, 6 Jan 2026 11:06:14 +0530 Subject: [PATCH 3/4] PR comments --- .../src/features/chat/core/conversation.ts | 30 +++++++++++-------- web-common/src/lib/event-emitter.ts | 14 +++++---- .../runtime-client/sse-connection-manager.ts | 22 +++++++++----- .../src/runtime-client/sse-fetch-client.ts | 29 +++++++++--------- 4 files changed, 55 insertions(+), 40 deletions(-) diff --git a/web-common/src/features/chat/core/conversation.ts b/web-common/src/features/chat/core/conversation.ts index aa849a609d0..5a05c7e9683 100644 --- a/web-common/src/features/chat/core/conversation.ts +++ b/web-common/src/features/chat/core/conversation.ts @@ -26,7 +26,7 @@ import { } from "./utils"; import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts"; -type ConversionEvents = { +type ConversationEvents = { "conversation-created": string; "stream-start": void; message: V1Message; @@ -40,12 +40,20 @@ type ConversionEvents = { * Handles streaming message sending, optimistic updates, and conversation-specific queries * for a single conversation using the streaming completion endpoint. */ -export class Conversation extends EventEmitter { +export class Conversation { // Public reactive state public readonly draftMessage = writable(""); public readonly isStreaming = writable(false); public readonly streamError = writable(null); + private readonly events = new EventEmitter(); + public readonly on = this.events.on.bind( + this.events, + ) as typeof this.events.on; + public readonly once = this.events.once.bind( + this.events, + ) as typeof this.events.once; + // Private state private sseClient: SSEFetchClient | null = null; private hasReceivedFirstMessage = false; @@ -54,9 +62,7 @@ export class Conversation extends EventEmitter { private readonly instanceId: string, public conversationId: string, private readonly agent: string = ToolName.ANALYST_AGENT, // Hardcoded default for now - ) { - super(); - } + ) {} // ===== PUBLIC API ===== @@ -134,7 +140,7 @@ export class Conversation extends EventEmitter { const userMessage = this.addOptimisticUserMessage(prompt); try { - this.emit("stream-start"); + this.events.emit("stream-start"); // Start streaming - this establishes the connection const streamPromise = this.startStreaming(prompt, context); @@ -142,7 +148,7 @@ export class Conversation extends EventEmitter { await streamPromise; // Stream has completed successfully - this.emit("stream-complete", this.conversationId); + this.events.emit("stream-complete", this.conversationId); // Temporary fix to make sure the title of the conversation is updated. void invalidateConversationsList(this.instanceId); @@ -160,7 +166,7 @@ export class Conversation extends EventEmitter { userMessage, this.hasReceivedFirstMessage, ); - this.emit("error", this.formatTransportError(error)); + this.events.emit("error", this.formatTransportError(error)); } finally { this.isStreaming.set(false); } @@ -190,7 +196,7 @@ export class Conversation extends EventEmitter { this.sseClient = null; } - this.clearListeners(); + this.events.clearListeners(); } // ===== PRIVATE IMPLEMENTATION ===== @@ -228,7 +234,7 @@ export class Conversation extends EventEmitter { message.data, ); this.processStreamingResponse(response); - if (response.message) this.emit("message", response.message); + if (response.message) this.events.emit("message", response.message); } catch (error) { console.error("Failed to parse streaming response:", error); this.streamError.set("Failed to process server response"); @@ -271,7 +277,7 @@ export class Conversation extends EventEmitter { }; // Notify that streaming is about to start (for concurrent stream management) - this.emit("stream-start"); + this.events.emit("stream-start"); // Start streaming - this will establish the connection and then stream until completion await this.sseClient.start(baseUrl, { @@ -350,7 +356,7 @@ export class Conversation extends EventEmitter { this.conversationId = realConversationId; // Notify that conversation was created - this.emit("conversation-created", realConversationId); + this.events.emit("conversation-created", realConversationId); } // ----- Cache Management ----- diff --git a/web-common/src/lib/event-emitter.ts b/web-common/src/lib/event-emitter.ts index 45a661047be..3c165528edf 100644 --- a/web-common/src/lib/event-emitter.ts +++ b/web-common/src/lib/event-emitter.ts @@ -1,11 +1,10 @@ import { v4 as uuidv4 } from "uuid"; -type VoidType = void | Promise; type Listener< Events extends Record, E extends keyof Events, -> = Events[E] extends void ? () => VoidType : (arg: Events[E]) => VoidType; -type Args< +> = Events[E] extends void ? () => void : (arg: Events[E]) => void; +type EventPayload< Events extends Record, E extends keyof Events, > = Events[E] extends void ? [] : [Events[E]]; @@ -31,8 +30,8 @@ export class EventEmitter> { return unsubscribe; } - once(event: E, listener: Listener) { - const unsubscribe = this.on(event, ((...args: Args) => { + public once(event: E, listener: Listener) { + const unsubscribe = this.on(event, ((...args: EventPayload) => { (listener as any)(...args); unsubscribe(); }) as Listener); @@ -40,7 +39,10 @@ export class EventEmitter> { return unsubscribe; } - public emit(event: E, ...args: Args) { + public emit( + event: E, + ...args: EventPayload + ) { const listeners = this.listeners.get(event); listeners?.forEach((listener) => { diff --git a/web-common/src/runtime-client/sse-connection-manager.ts b/web-common/src/runtime-client/sse-connection-manager.ts index 5b590077d86..9f61fd13099 100644 --- a/web-common/src/runtime-client/sse-connection-manager.ts +++ b/web-common/src/runtime-client/sse-connection-manager.ts @@ -36,7 +36,7 @@ type SSEConnectionManagerEvents = { /** * A wrapper around SSEFetchClient to manage status and reconnections */ -export class SSEConnectionManager extends EventEmitter { +export class SSEConnectionManager { public status = writable(ConnectionStatus.CLOSED); public url: string; @@ -46,6 +46,14 @@ export class SSEConnectionManager extends EventEmitter; }; + private readonly events = new EventEmitter(); + public readonly on = this.events.on.bind( + this.events, + ) as typeof this.events.on; + public readonly once = this.events.once.bind( + this.events, + ) as typeof this.events.once; + private client = new SSEFetchClient(); private autoCloseThrottler: Throttler | undefined; @@ -54,8 +62,6 @@ export class SSEConnectionManager extends EventEmitter { - this.emit("message", message); + this.events.emit("message", message); }; private handleSuccessfulConnection = () => { @@ -189,7 +195,7 @@ export class SSEConnectionManager extends EventEmitter 1) { - this.emit("reconnect"); + this.events.emit("reconnect"); } }; @@ -246,6 +252,6 @@ export class SSEConnectionManager extends EventEmitter { +export class SSEFetchClient { private abortController: AbortController | undefined; + private readonly events = new EventEmitter(); + public readonly on = this.events.on.bind( + this.events, + ) as typeof this.events.on; + public readonly once = this.events.once.bind( + this.events, + ) as typeof this.events.once; + /** * Start streaming from the given URL * @@ -148,7 +156,7 @@ export class SSEFetchClient extends EventEmitter { throw new Error("No response body"); } - this.emit("open"); + this.events.emit("open"); // Process the SSE stream await this.processSSEStream(response.body); @@ -156,11 +164,11 @@ export class SSEFetchClient extends EventEmitter { if (error.name !== "AbortError") { const errorArg = error instanceof Error ? error : new Error(String(error)); - this.emit("error", errorArg); + this.events.emit("error", errorArg); } } finally { this.stop(); - this.emit("close"); + this.events.emit("close"); } } @@ -182,7 +190,7 @@ export class SSEFetchClient extends EventEmitter { this.stop(); // Clear all event listeners - this.clearListeners(); + this.events.clearListeners(); } /** @@ -220,7 +228,7 @@ export class SSEFetchClient extends EventEmitter { if (isEventComplete(line)) { // Empty line signals end of event - emit if valid if (isValidEvent(currentEvent)) { - this.emitMessage(currentEvent); + this.events.emit("message", currentEvent); } currentEvent = {}; } else { @@ -232,17 +240,10 @@ export class SSEFetchClient extends EventEmitter { // Emit any remaining event in the buffer if (isValidEvent(currentEvent)) { - this.emitMessage(currentEvent); + this.events.emit("message", currentEvent); } } finally { reader.releaseLock(); } } - - /** - * Emit a message to all registered listeners - */ - private emitMessage(message: SSEMessage): void { - this.emit("message", message); - } } From 60803e22ed162fc554c7b1e910f0491ca308dfa9 Mon Sep 17 00:00:00 2001 From: Aditya Hegde Date: Wed, 7 Jan 2026 11:02:39 +0530 Subject: [PATCH 4/4] Add callbacks for direct callers --- web-common/src/features/chat/core/conversation.ts | 8 ++++++-- .../src/features/chat/core/input/ChatInput.svelte | 13 +++---------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/web-common/src/features/chat/core/conversation.ts b/web-common/src/features/chat/core/conversation.ts index 5a05c7e9683..da0d1d64f16 100644 --- a/web-common/src/features/chat/core/conversation.ts +++ b/web-common/src/features/chat/core/conversation.ts @@ -121,7 +121,10 @@ export class Conversation { * @param context - Chat context to be sent with the message * @param options - Callback functions for different stages of message sending */ - public async sendMessage(context: RuntimeServiceCompleteBody): Promise { + public async sendMessage( + context: RuntimeServiceCompleteBody, + options?: { onStreamStart?: () => void }, + ): Promise { // Prevent concurrent message sending if (get(this.isStreaming)) { this.streamError.set("Please wait for the current response to complete"); @@ -140,7 +143,8 @@ export class Conversation { const userMessage = this.addOptimisticUserMessage(prompt); try { - this.events.emit("stream-start"); + options?.onStreamStart?.(); // Callback for direct callers + this.events.emit("stream-start"); // Event for external listeners // Start streaming - this establishes the connection const streamPromise = this.startStreaming(prompt, context); diff --git a/web-common/src/features/chat/core/input/ChatInput.svelte b/web-common/src/features/chat/core/input/ChatInput.svelte index eb4164871ac..365d6396124 100644 --- a/web-common/src/features/chat/core/input/ChatInput.svelte +++ b/web-common/src/features/chat/core/input/ChatInput.svelte @@ -27,14 +27,6 @@ $: draftMessageStore = currentConversation.draftMessage; $: isStreamingStore = currentConversation.isStreaming; - let streamStartUnsub: (() => void) | undefined = undefined; - $: { - streamStartUnsub?.(); - streamStartUnsub = currentConversation.on("stream-start", () => { - editor.commands.setContent(""); - }); - } - $: value = $draftMessageStore; $: disabled = $getConversationQuery?.isLoading || $isStreamingStore; $: canSend = !disabled && value.trim(); @@ -48,7 +40,9 @@ // Message handling with input focus try { - await currentConversation.sendMessage($additionalContextStore); + await currentConversation.sendMessage($additionalContextStore, { + onStreamStart: () => editor.commands.setContent(""), + }); onSend?.(); } catch (error) { console.error("Failed to send message:", error); @@ -114,7 +108,6 @@ chatMounted.set(false); editor.destroy(); unsubStartChatEvent(); - streamStartUnsub?.(); }; });