Skip to content

Commit 35bf113

Browse files
committed
chore(rivetkit): correctly stringify engine runner request ids for logs
1 parent 9d011a4 commit 35bf113

File tree

5 files changed

+153
-72
lines changed

5 files changed

+153
-72
lines changed

engine/sdks/typescript/runner/src/tunnel.ts

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type * as protocol from "@rivetkit/engine-runner-protocol";
22
import type { MessageId, RequestId } from "@rivetkit/engine-runner-protocol";
3-
import { v4 as uuidv4, stringify as uuidstringify } from "uuid";
3+
import { stringify as uuidstringify, v4 as uuidv4 } from "uuid";
44
import { logger } from "./log";
55
import type { ActorInstance, Runner } from "./mod";
66
import { unreachable } from "./utils";
@@ -77,12 +77,20 @@ export class Tunnel {
7777
// Build message
7878
const messageId = generateUuidBuffer();
7979

80-
const requestIdStr = bufferToString(requestId);
81-
this.#pendingTunnelMessages.set(bufferToString(messageId), {
80+
const requestIdStr = idToStr(requestId);
81+
const messageIdStr = idToStr(messageId);
82+
this.#pendingTunnelMessages.set(messageIdStr, {
8283
sentAt: Date.now(),
8384
requestIdStr,
8485
});
8586

87+
logger()?.debug({
88+
msg: "send tunnel msg",
89+
requestId: requestIdStr,
90+
messageId: messageIdStr,
91+
message: messageKind,
92+
});
93+
8694
// Send message
8795
const message: protocol.ToServer = {
8896
tag: "ToServerTunnelMessage",
@@ -111,8 +119,8 @@ export class Tunnel {
111119

112120
logger()?.debug({
113121
msg: "ack tunnel msg",
114-
requestId: uuidstringify(new Uint8Array(requestId)),
115-
messageId: uuidstringify(new Uint8Array(messageId)),
122+
requestId: idToStr(requestId),
123+
messageId: idToStr(messageId),
116124
});
117125

118126
this.#runner.__sendToServer(message);
@@ -163,7 +171,10 @@ export class Tunnel {
163171
const webSocket = this.#actorWebSockets.get(requestIdStr);
164172
if (webSocket) {
165173
// Close the WebSocket connection
166-
webSocket.__closeWithRetry(1000, "Message acknowledgment timeout");
174+
webSocket.__closeWithRetry(
175+
1000,
176+
"Message acknowledgment timeout",
177+
);
167178

168179
// Clean up from actorWebSockets map
169180
this.#actorWebSockets.delete(requestIdStr);
@@ -207,7 +218,11 @@ export class Tunnel {
207218
actor.webSockets.clear();
208219
}
209220

210-
async #fetch(actorId: string, requestId: protocol.RequestId, request: Request): Promise<Response> {
221+
async #fetch(
222+
actorId: string,
223+
requestId: protocol.RequestId,
224+
request: Request,
225+
): Promise<Response> {
211226
// Validate actor exists
212227
if (!this.#runner.hasActor(actorId)) {
213228
logger()?.warn({
@@ -219,7 +234,10 @@ export class Tunnel {
219234
//
220235
// See should_retry_request_inner
221236
// https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/guard-core/src/proxy_service.rs#L2458
222-
return new Response("Actor not found", { status: 503, headers: { "x-rivet-error": "runner.actor_not_found" } });
237+
return new Response("Actor not found", {
238+
status: 503,
239+
headers: { "x-rivet-error": "runner.actor_not_found" },
240+
});
223241
}
224242

225243
const fetchHandler = this.#runner.config.fetch(
@@ -237,19 +255,28 @@ export class Tunnel {
237255
}
238256

239257
async handleTunnelMessage(message: protocol.ToClientTunnelMessage) {
258+
const requestIdStr = idToStr(message.requestId);
259+
const messageIdStr = idToStr(message.messageId);
240260
logger()?.debug({
241-
msg: "tunnel msg",
242-
requestId: uuidstringify(new Uint8Array(message.requestId)),
243-
messageId: uuidstringify(new Uint8Array(message.messageId)),
261+
msg: "receive tunnel msg",
262+
requestId: requestIdStr,
263+
messageId: messageIdStr,
244264
message: message.messageKind,
245265
});
246266

247267
if (message.messageKind.tag === "TunnelAck") {
248268
// Mark pending message as acknowledged and remove it
249-
const msgIdStr = bufferToString(message.messageId);
250-
const pending = this.#pendingTunnelMessages.get(msgIdStr);
269+
const pending = this.#pendingTunnelMessages.get(messageIdStr);
251270
if (pending) {
252-
this.#pendingTunnelMessages.delete(msgIdStr);
271+
const didDelete =
272+
this.#pendingTunnelMessages.delete(messageIdStr);
273+
if (!didDelete) {
274+
logger()?.warn({
275+
msg: "received tunnel ack for nonexistent message",
276+
requestId: requestIdStr,
277+
messageId: messageIdStr,
278+
});
279+
}
253280
}
254281
} else {
255282
switch (message.messageKind.tag) {
@@ -282,14 +309,15 @@ export class Tunnel {
282309
message.messageKind.val,
283310
);
284311
break;
285-
case "ToClientWebSocketMessage":
312+
case "ToClientWebSocketMessage": {
286313
this.#sendAck(message.requestId, message.messageId);
287314

288-
let _unhandled = await this.#handleWebSocketMessage(
315+
const _unhandled = await this.#handleWebSocketMessage(
289316
message.requestId,
290317
message.messageKind.val,
291318
);
292319
break;
320+
}
293321
case "ToClientWebSocketClose":
294322
this.#sendAck(message.requestId, message.messageId);
295323

@@ -309,7 +337,7 @@ export class Tunnel {
309337
req: protocol.ToClientRequestStart,
310338
) {
311339
// Track this request for the actor
312-
const requestIdStr = bufferToString(requestId);
340+
const requestIdStr = idToStr(requestId);
313341
const actor = this.#runner.getActor(req.actorId);
314342
if (actor) {
315343
actor.requests.add(requestIdStr);
@@ -342,8 +370,8 @@ export class Tunnel {
342370
existing.actorId = req.actorId;
343371
} else {
344372
this.#actorPendingRequests.set(requestIdStr, {
345-
resolve: () => { },
346-
reject: () => { },
373+
resolve: () => {},
374+
reject: () => {},
347375
streamController: controller,
348376
actorId: req.actorId,
349377
});
@@ -366,7 +394,11 @@ export class Tunnel {
366394
await this.#sendResponse(requestId, response);
367395
} else {
368396
// Non-streaming request
369-
const response = await this.#fetch(req.actorId, requestId, request);
397+
const response = await this.#fetch(
398+
req.actorId,
399+
requestId,
400+
request,
401+
);
370402
await this.#sendResponse(requestId, response);
371403
}
372404
} catch (error) {
@@ -385,7 +417,7 @@ export class Tunnel {
385417
requestId: ArrayBuffer,
386418
chunk: protocol.ToClientRequestChunk,
387419
) {
388-
const requestIdStr = bufferToString(requestId);
420+
const requestIdStr = idToStr(requestId);
389421
const pending = this.#actorPendingRequests.get(requestIdStr);
390422
if (pending?.streamController) {
391423
pending.streamController.enqueue(new Uint8Array(chunk.body));
@@ -397,7 +429,7 @@ export class Tunnel {
397429
}
398430

399431
async #handleRequestAbort(requestId: ArrayBuffer) {
400-
const requestIdStr = bufferToString(requestId);
432+
const requestIdStr = idToStr(requestId);
401433
const pending = this.#actorPendingRequests.get(requestIdStr);
402434
if (pending?.streamController) {
403435
pending.streamController.error(new Error("Request aborted"));
@@ -461,7 +493,7 @@ export class Tunnel {
461493
requestId: protocol.RequestId,
462494
open: protocol.ToClientWebSocketOpen,
463495
) {
464-
const webSocketId = bufferToString(requestId);
496+
const webSocketId = idToStr(requestId);
465497
// Validate actor exists
466498
const actor = this.#runner.getActor(open.actorId);
467499
if (!actor) {
@@ -518,7 +550,7 @@ export class Tunnel {
518550
const dataBuffer =
519551
typeof data === "string"
520552
? (new TextEncoder().encode(data)
521-
.buffer as ArrayBuffer)
553+
.buffer as ArrayBuffer)
522554
: data;
523555

524556
this.#sendMessage(requestId, {
@@ -575,7 +607,12 @@ export class Tunnel {
575607
});
576608

577609
// Send open confirmation
578-
let hibernationConfig = this.#runner.config.getActorHibernationConfig(actor.actorId, requestId, request);
610+
const hibernationConfig =
611+
this.#runner.config.getActorHibernationConfig(
612+
actor.actorId,
613+
requestId,
614+
request,
615+
);
579616
this.#sendMessage(requestId, {
580617
tag: "ToServerWebSocketOpen",
581618
val: {
@@ -587,8 +624,6 @@ export class Tunnel {
587624
// Notify adapter that connection is open
588625
adapter._handleOpen(requestId);
589626

590-
591-
592627
// Call websocket handler
593628
await websocketHandler(
594629
this.#runner,
@@ -623,14 +658,19 @@ export class Tunnel {
623658
requestId: ArrayBuffer,
624659
msg: protocol.ToClientWebSocketMessage,
625660
): Promise<boolean> {
626-
const webSocketId = bufferToString(requestId);
661+
const webSocketId = idToStr(requestId);
627662
const adapter = this.#actorWebSockets.get(webSocketId);
628663
if (adapter) {
629664
const data = msg.binary
630665
? new Uint8Array(msg.data)
631666
: new TextDecoder().decode(new Uint8Array(msg.data));
632667

633-
return adapter._handleMessage(requestId, data, msg.index, msg.binary);
668+
return adapter._handleMessage(
669+
requestId,
670+
data,
671+
msg.index,
672+
msg.binary,
673+
);
634674
} else {
635675
return true;
636676
}
@@ -639,11 +679,12 @@ export class Tunnel {
639679
__ackWebsocketMessage(requestId: ArrayBuffer, index: number) {
640680
logger()?.debug({
641681
msg: "ack ws msg",
642-
requestId: uuidstringify(new Uint8Array(requestId)),
682+
requestId: idToStr(requestId),
643683
index,
644684
});
645685

646-
if (index < 0 || index > 65535) throw new Error("invalid websocket ack index");
686+
if (index < 0 || index > 65535)
687+
throw new Error("invalid websocket ack index");
647688

648689
// Send the ack message
649690
this.#sendMessage(requestId, {
@@ -658,27 +699,26 @@ export class Tunnel {
658699
requestId: ArrayBuffer,
659700
close: protocol.ToClientWebSocketClose,
660701
) {
661-
const webSocketId = bufferToString(requestId);
662-
const adapter = this.#actorWebSockets.get(webSocketId);
702+
const requestIdStr = idToStr(requestId);
703+
const adapter = this.#actorWebSockets.get(requestIdStr);
663704
if (adapter) {
664705
adapter._handleClose(
665706
requestId,
666707
close.code || undefined,
667708
close.reason || undefined,
668709
);
669-
this.#actorWebSockets.delete(webSocketId);
710+
this.#actorWebSockets.delete(requestIdStr);
670711
}
671712
}
672713
}
673714

674-
/** Converts a buffer to a string. Used for storing strings in a lookup map. */
675-
function bufferToString(buffer: ArrayBuffer): string {
676-
return Buffer.from(buffer).toString("base64");
677-
}
678-
679715
/** Generates a UUID as bytes. */
680716
function generateUuidBuffer(): ArrayBuffer {
681717
const buffer = new Uint8Array(16);
682718
uuidv4(undefined, buffer);
683719
return buffer.buffer;
684720
}
721+
722+
function idToStr(id: ArrayBuffer): string {
723+
return uuidstringify(new Uint8Array(id));
724+
}

0 commit comments

Comments
 (0)