@@ -76,6 +76,10 @@ export class EngineActorDriver implements ActorDriver {
7676 #runnerStarted: PromiseWithResolvers < undefined > = promiseWithResolvers ( ) ;
7777 #runnerStopped: PromiseWithResolvers < undefined > = promiseWithResolvers ( ) ;
7878
79+ // WebSocket message acknowledgment debouncing
80+ #wsAckQueue: Map < string , bigint > = new Map ( ) ;
81+ #wsAckFlushInterval?: NodeJS . Timeout ;
82+
7983 constructor (
8084 registryConfig : RegistryConfig ,
8185 runConfig : RunnerConfig ,
@@ -284,6 +288,15 @@ export class EngineActorDriver implements ActorDriver {
284288 namespace : runConfig . namespace ,
285289 runnerName : runConfig . runnerName ,
286290 } ) ;
291+
292+ // Start WebSocket ack flush interval
293+ //
294+ // Decreasing this reduces the amount of buffered messages on the
295+ // gateway
296+ //
297+ // Gateway timeout configured to 30s
298+ // https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17
299+ this . #wsAckFlushInterval = setInterval ( ( ) => this . #flushWsAcks( ) , 1000 ) ;
287300 }
288301
289302 async #loadActorHandler( actorId : string ) : Promise < ActorHandler > {
@@ -302,6 +315,20 @@ export class EngineActorDriver implements ActorDriver {
302315 return handler . actor ;
303316 }
304317
318+ #flushWsAcks( ) : void {
319+ if ( this . #wsAckQueue. size === 0 ) return ;
320+
321+ for ( const [ requestIdStr , messageIndex ] of this . #wsAckQueue. entries ( ) ) {
322+ // Convert string back to ArrayBuffer
323+ const requestId = new Uint8Array (
324+ requestIdStr . split ( "," ) . map ( ( x ) => Number . parseInt ( x ) ) ,
325+ ) . buffer ;
326+ this . #runner. sendWebsocketMessageAck ( requestId , messageIndex ) ;
327+ }
328+
329+ this . #wsAckQueue. clear ( ) ;
330+ }
331+
305332 getContext ( actorId : string ) : DriverContext {
306333 return { } ;
307334 }
@@ -554,13 +581,28 @@ export class EngineActorDriver implements ActorDriver {
554581
555582 invariant ( event . rivetRequestId , "missing rivetRequestId" ) ;
556583 invariant ( event . rivetMessageIndex , "missing rivetMessageIndex" ) ;
557- this . #runner. sendWebsocketMessageAck (
558- event . rivetRequestId ,
559- event . rivetMessageIndex ,
560- ) ;
584+
585+ // Track only the highest seen message index per request
586+ // Convert ArrayBuffer to string for Map key
587+ const currentMax = this . #wsAckQueue. get ( requestId ) ?? - 1n ;
588+ if ( event . rivetMessageIndex > currentMax ) {
589+ this . #wsAckQueue. set (
590+ requestId ,
591+ BigInt ( event . rivetMessageIndex ) ,
592+ ) ;
593+ } else {
594+ logger ( ) . warn ( {
595+ msg : "received lower index than ack queue for message" ,
596+ requestId : requestId ,
597+ queuedMessageIndex : currentMax ,
598+ eventMessageIndex : event . rivetMessageIndex ,
599+ } ) ;
600+ }
561601 } ) ;
562602
563603 websocket . addEventListener ( "close" , ( event ) => {
604+ // Flush any pending acks before closing
605+ this . #flushWsAcks( ) ;
564606 wsHandlerPromise . then ( ( x ) => x . onClose ?.( event , wsContext ) ) ;
565607 } ) ;
566608
@@ -575,6 +617,16 @@ export class EngineActorDriver implements ActorDriver {
575617
576618 async shutdownRunner ( immediate : boolean ) : Promise < void > {
577619 logger ( ) . info ( { msg : "stopping engine actor driver" } ) ;
620+
621+ // Clear the ack flush interval
622+ if ( this . #wsAckFlushInterval) {
623+ clearInterval ( this . #wsAckFlushInterval) ;
624+ this . #wsAckFlushInterval = undefined ;
625+ }
626+
627+ // Flush any remaining acks
628+ this . #flushWsAcks( ) ;
629+
578630 await this . #runner. shutdown ( immediate ) ;
579631 }
580632
0 commit comments