Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions web-common/src/features/chat/core/conversation-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,17 @@ export class ConversationManager {
private static readonly MAX_CONCURRENT_STREAMS = 3;

private newConversation: Conversation;
private newConversationUnsub: (() => void) | null = null;
private conversations = new Map<string, Conversation>();
private conversationSelector: ConversationSelector;
private agent?: string;
private readonly agent?: string;

constructor(
public readonly instanceId: string,
options: ConversationManagerOptions,
) {
this.agent = options.agent;
this.newConversation = new Conversation(
this.instanceId,
NEW_CONVERSATION_ID,
{
agent: this.agent,
onStreamStart: () => this.enforceMaxConcurrentStreams(),
onConversationCreated: (conversationId: string) => {
this.handleConversationCreated(conversationId);
},
},
);
this.createNewConversation();

switch (options.conversationState) {
case "url":
Expand Down Expand Up @@ -121,10 +112,10 @@ export class ConversationManager {
const conversation = new Conversation(
this.instanceId,
$conversationId,
{
agent: this.agent,
onStreamStart: () => this.enforceMaxConcurrentStreams(),
},
this.agent,
);
conversation.on("stream-start", () =>
this.enforceMaxConcurrentStreams(),
);
this.conversations.set($conversationId, conversation);
return conversation;
Expand Down Expand Up @@ -162,6 +153,26 @@ export class ConversationManager {

// ===== PRIVATE IMPLEMENTATION =====

private createNewConversation() {
this.newConversationUnsub?.();
this.newConversation = new Conversation(
this.instanceId,
NEW_CONVERSATION_ID,
this.agent,
);
const streamStartUnsub = this.newConversation.on("stream-start", () =>
this.enforceMaxConcurrentStreams(),
);
const conversationStartedUnsub = this.newConversation.on(
"conversation-created",
(conversationId) => this.handleConversationCreated(conversationId),
);
this.newConversationUnsub = () => {
streamStartUnsub();
conversationStartedUnsub();
};
}

// ----- Stream Management -----

/**
Expand Down Expand Up @@ -221,17 +232,7 @@ export class ConversationManager {
this.conversations.set(conversationId, this.newConversation);

// Create a fresh "new" conversation instance
this.newConversation = new Conversation(
this.instanceId,
NEW_CONVERSATION_ID,
{
agent: this.agent,
onStreamStart: () => this.enforceMaxConcurrentStreams(),
onConversationCreated: (conversationId: string) => {
this.handleConversationCreated(conversationId);
},
},
);
this.createNewConversation();
}
}

Expand Down
62 changes: 31 additions & 31 deletions web-common/src/features/chat/core/conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import {
invalidateConversationsList,
NEW_CONVERSATION_ID,
} from "./utils";
import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts";

type ConversationEvents = {
"conversation-created": string;
"stream-start": void;
message: V1Message;
"stream-complete": string;
error: string;
};

/**
* Individual conversation state management.
Expand All @@ -37,25 +46,23 @@ export class Conversation {
public readonly isStreaming = writable(false);
public readonly streamError = writable<string | null>(null);

private readonly events = new EventEmitter<ConversationEvents>();
public readonly on = this.events.on.bind(
this.events,
) as typeof this.events.on;
public readonly once = this.events.once.bind(
this.events,
) as typeof this.events.once;

// Private state
private sseClient: SSEFetchClient | null = null;
private hasReceivedFirstMessage = false;

constructor(
private readonly instanceId: string,
public conversationId: string,
private readonly options: {
agent?: string;
onStreamStart?: () => void;
onConversationCreated?: (conversationId: string) => void;
} = {
agent: ToolName.ANALYST_AGENT, // Hardcoded default for now
},
) {
if (this.options) {
this.options.agent ??= ToolName.ANALYST_AGENT;
}
}
private readonly agent: string = ToolName.ANALYST_AGENT, // Hardcoded default for now
) {}

// ===== PUBLIC API =====

Expand Down Expand Up @@ -116,12 +123,7 @@ export class Conversation {
*/
public async sendMessage(
context: RuntimeServiceCompleteBody,
options?: {
onStreamStart?: () => void;
onMessage?: (message: V1Message) => void;
onStreamComplete?: (conversationId: string) => void;
onError?: (error: string) => void;
},
options?: { onStreamStart?: () => void },
): Promise<void> {
// Prevent concurrent message sending
if (get(this.isStreaming)) {
Expand All @@ -141,19 +143,16 @@ export class Conversation {
const userMessage = this.addOptimisticUserMessage(prompt);

try {
options?.onStreamStart?.();
options?.onStreamStart?.(); // Callback for direct callers
this.events.emit("stream-start"); // Event for external listeners
// Start streaming - this establishes the connection
const streamPromise = this.startStreaming(
prompt,
context,
options?.onMessage,
);
const streamPromise = this.startStreaming(prompt, context);

// Wait for streaming to complete
await streamPromise;

// Stream has completed successfully
options?.onStreamComplete?.(this.conversationId);
this.events.emit("stream-complete", this.conversationId);

// Temporary fix to make sure the title of the conversation is updated.
void invalidateConversationsList(this.instanceId);
Expand All @@ -171,7 +170,7 @@ export class Conversation {
userMessage,
this.hasReceivedFirstMessage,
);
options?.onError?.(this.formatTransportError(error));
this.events.emit("error", this.formatTransportError(error));
} finally {
this.isStreaming.set(false);
}
Expand Down Expand Up @@ -200,6 +199,8 @@ export class Conversation {
this.sseClient.cleanup();
this.sseClient = null;
}

this.events.clearListeners();
}

// ===== PRIVATE IMPLEMENTATION =====
Expand All @@ -213,7 +214,6 @@ export class Conversation {
private async startStreaming(
prompt: string,
context: RuntimeServiceCompleteBody | undefined,
onMessage: ((message: V1Message) => void) | undefined,
): Promise<void> {
// Initialize SSE client if not already done
if (!this.sseClient) {
Expand All @@ -238,7 +238,7 @@ export class Conversation {
message.data,
);
this.processStreamingResponse(response);
if (response.message) onMessage?.(response.message);
if (response.message) this.events.emit("message", response.message);
} catch (error) {
console.error("Failed to parse streaming response:", error);
this.streamError.set("Failed to process server response");
Expand Down Expand Up @@ -276,12 +276,12 @@ export class Conversation {
? undefined
: this.conversationId,
prompt,
agent: this.options?.agent,
agent: this.agent,
...context,
};

// Notify that streaming is about to start (for concurrent stream management)
this.options?.onStreamStart?.();
this.events.emit("stream-start");

// Start streaming - this will establish the connection and then stream until completion
await this.sseClient.start(baseUrl, {
Expand Down Expand Up @@ -360,7 +360,7 @@ export class Conversation {
this.conversationId = realConversationId;

// Notify that conversation was created
this.options?.onConversationCreated?.(realConversationId);
this.events.emit("conversation-created", realConversationId);
}

// ----- Cache Management -----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export class FileAndResourceWatcher {

await invalidate("init");

eventBus.emit("rill-yaml-updated", null);
eventBus.emit("rill-yaml-updated");
}
this.seenFiles.add(res.path);
break;
Expand Down
12 changes: 8 additions & 4 deletions web-common/src/features/sample-data/generate-sample-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ export async function generateSampleData(
},
},
});
const conversation = new Conversation(instanceId, NEW_CONVERSATION_ID, {
agent: ToolName.DEVELOPER_AGENT,
});
const conversation = new Conversation(
instanceId,
NEW_CONVERSATION_ID,
ToolName.DEVELOPER_AGENT,
);
const agentPrompt = `Generate a NEW model with fresh data for the following user prompt: ${userPrompt}`;
conversation.draftMessage.set(agentPrompt);

Expand Down Expand Up @@ -124,15 +126,17 @@ export async function generateSampleData(
}
}
};
const handleMessageUnsub = conversation.on("message", handleMessage);

let cancelled = false;

conversation.cancelStream();

await conversation.sendMessage({}, { onMessage: handleMessage });
await conversation.sendMessage({});

await waitUntil(() => !get(conversation.isStreaming));

handleMessageUnsub();
overlay.set(null);
if (cancelled) return;
if (!created) {
Expand Down
2 changes: 1 addition & 1 deletion web-common/src/lib/actions/modified-click.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function modified(params: Params) {
}

const event = (modifier ? `${modifier}-click` : "click") as keyof Events;
eventBus.emit(event, null);
eventBus.emit(event);

if (handler) {
await handler(e);
Expand Down
77 changes: 7 additions & 70 deletions web-common/src/lib/event-bus/event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,20 @@ import type {
PageContentResized,
NotificationMessage,
} from "./events";

class EventBus {
private listeners: EventMap = new Map();

on<Event extends T>(event: Event, callback: Listener<Event>) {
const key = generateUUID();
const eventMap = this.listeners.get(event);

if (!eventMap) {
this.listeners.set(
event,
new Map<string, Listener<T>>([[key, callback]]),
);
} else {
eventMap.set(key, callback);
}

const unsubscribe = () => this.listeners.get(event)?.delete(key);

return unsubscribe;
}

once<Event extends T>(event: Event, callback: Listener<Event>) {
const unsubscribe = this.on(event, (payload) => {
callback(payload);
unsubscribe();
});

return unsubscribe;
}

emit<Event extends T>(event: Event, payload: Events[Event]) {
const listeners = this.listeners.get(event);

listeners?.forEach((cb) => {
cb(payload);
});
}
}

function generateUUID(): string {
// Generate random numbers for the UUID
const randomNumbers: number[] = new Array(16)
.fill(0)
.map(() => Math.floor(Math.random() * 256));

// Set the version and variant bits
randomNumbers[6] = (randomNumbers[6] & 0x0f) | 0x40; // Version 4
randomNumbers[8] = (randomNumbers[8] & 0x3f) | 0x80; // Variant 10

// Convert to hexadecimal and format as a UUID
const hexDigits: string = randomNumbers
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
return `${hexDigits.slice(0, 8)}-${hexDigits.slice(8, 12)}-${hexDigits.slice(12, 16)}-${hexDigits.slice(16, 20)}-${hexDigits.slice(20, 32)}`;
}

export const eventBus = new EventBus();
import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts";

export interface Events {
notification: NotificationMessage;
"clear-all-notifications": void;
"add-banner": BannerEvent;
"remove-banner": string;
"shift-click": null;
"command-click": null;
click: null;
"shift-command-click": null;
"shift-click": void;
"command-click": void;
click: void;
"shift-command-click": void;
"page-content-resized": PageContentResized;
"start-chat": string;
"rill-yaml-updated": null;
"rill-yaml-updated": void;
}

type T = keyof Events;

type Listener<EventType extends T> = (e: Events[EventType]) => void;

type EventMap = Map<T, Listeners>;

type Listeners = Map<string, Listener<T>>;
export const eventBus = new EventEmitter<Events>();
Loading
Loading