11import { Logger } from "@trigger.dev/core/logger" ;
22import { nanoid } from "nanoid" ;
33import pLimit from "p-limit" ;
4+ import { signalsEmitter } from "~/services/signals.server" ;
45
56export type DynamicFlushSchedulerConfig < T > = {
67 batchSize : number ;
@@ -22,6 +23,7 @@ export class DynamicFlushScheduler<T> {
2223 private readonly BATCH_SIZE : number ;
2324 private readonly FLUSH_INTERVAL : number ;
2425 private flushTimer : NodeJS . Timeout | null ;
26+ private metricsReporterTimer : NodeJS . Timeout | undefined ;
2527 private readonly callback : ( flushId : string , batch : T [ ] ) => Promise < void > ;
2628
2729 // New properties for dynamic scaling
@@ -41,6 +43,7 @@ export class DynamicFlushScheduler<T> {
4143 droppedEvents : 0 ,
4244 droppedEventsByKind : new Map < string , number > ( ) ,
4345 } ;
46+ private isShuttingDown : boolean = false ;
4447
4548 // New properties for load shedding
4649 private readonly loadSheddingThreshold : number ;
@@ -75,6 +78,7 @@ export class DynamicFlushScheduler<T> {
7578
7679 this . startFlushTimer ( ) ;
7780 this . startMetricsReporter ( ) ;
81+ this . setupShutdownHandlers ( ) ;
7882 }
7983
8084 addToBatch ( items : T [ ] ) : void {
@@ -119,8 +123,8 @@ export class DynamicFlushScheduler<T> {
119123 this . currentBatch . push ( ...itemsToAdd ) ;
120124 this . totalQueuedItems += itemsToAdd . length ;
121125
122- // Check if we need to create a batch
123- if ( this . currentBatch . length >= this . currentBatchSize ) {
126+ // Check if we need to create a batch (if we are shutting down, create a batch immediately because the flush timer is stopped)
127+ if ( this . currentBatch . length >= this . currentBatchSize || this . isShuttingDown ) {
124128 this . createBatch ( ) ;
125129 }
126130
@@ -137,6 +141,11 @@ export class DynamicFlushScheduler<T> {
137141 this . resetFlushTimer ( ) ;
138142 }
139143
144+ private setupShutdownHandlers ( ) : void {
145+ signalsEmitter . on ( "SIGTERM" , ( ) => this . shutdown ( ) ) ;
146+ signalsEmitter . on ( "SIGINT" , ( ) => this . shutdown ( ) ) ;
147+ }
148+
140149 private startFlushTimer ( ) : void {
141150 this . flushTimer = setInterval ( ( ) => this . checkAndFlush ( ) , this . FLUSH_INTERVAL ) ;
142151 }
@@ -145,6 +154,9 @@ export class DynamicFlushScheduler<T> {
145154 if ( this . flushTimer ) {
146155 clearInterval ( this . flushTimer ) ;
147156 }
157+
158+ if ( this . isShuttingDown ) return ;
159+
148160 this . startFlushTimer ( ) ;
149161 }
150162
@@ -226,7 +238,7 @@ export class DynamicFlushScheduler<T> {
226238 }
227239
228240 private lastConcurrencyAdjustment : number = Date . now ( ) ;
229-
241+
230242 private adjustConcurrency ( backOff : boolean = false ) : void {
231243 const currentConcurrency = this . limiter . concurrency ;
232244 let newConcurrency = currentConcurrency ;
@@ -281,7 +293,7 @@ export class DynamicFlushScheduler<T> {
281293
282294 private startMetricsReporter ( ) : void {
283295 // Report metrics every 30 seconds
284- setInterval ( ( ) => {
296+ this . metricsReporterTimer = setInterval ( ( ) => {
285297 const droppedByKind : Record < string , number > = { } ;
286298 this . metrics . droppedEventsByKind . forEach ( ( count , kind ) => {
287299 droppedByKind [ kind ] = count ;
@@ -356,10 +368,18 @@ export class DynamicFlushScheduler<T> {
356368
357369 // Graceful shutdown
358370 async shutdown ( ) : Promise < void > {
371+ if ( this . isShuttingDown ) return ;
372+
373+ this . isShuttingDown = true ;
374+
359375 if ( this . flushTimer ) {
360376 clearInterval ( this . flushTimer ) ;
361377 }
362378
379+ if ( this . metricsReporterTimer ) {
380+ clearInterval ( this . metricsReporterTimer ) ;
381+ }
382+
363383 // Flush any remaining items
364384 if ( this . currentBatch . length > 0 ) {
365385 this . createBatch ( ) ;
0 commit comments