@@ -116,6 +116,10 @@ def export_failure_log(self, error_name, error_message):
116116 def close (self ):
117117 raise NotImplementedError ("Subclasses must implement close" )
118118
119+ @abstractmethod
120+ def flush (self ):
121+ raise NotImplementedError ("Subclasses must implement flush" )
122+
119123
120124class NoopTelemetryClient (BaseTelemetryClient ):
121125 """
@@ -139,6 +143,9 @@ def export_failure_log(self, error_name, error_message):
139143 def close (self ):
140144 pass
141145
146+ def flush (self ):
147+ pass
148+
142149
143150class TelemetryClient (BaseTelemetryClient ):
144151 """
@@ -151,7 +158,6 @@ class TelemetryClient(BaseTelemetryClient):
151158 TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth"
152159
153160 DEFAULT_BATCH_SIZE = 100
154- DEFAULT_FLUSH_INTERVAL_SECONDS = 90
155161
156162 def __init__ (
157163 self ,
@@ -167,7 +173,6 @@ def __init__(
167173 self ._batch_size = (
168174 batch_size if batch_size is not None else self .DEFAULT_BATCH_SIZE
169175 )
170- self ._flush_interval_seconds = self .DEFAULT_FLUSH_INTERVAL_SECONDS
171176 self ._session_id_hex = session_id_hex
172177 self ._auth_provider = auth_provider
173178 self ._user_agent = None
@@ -177,41 +182,6 @@ def __init__(
177182 self ._host_url = host_url
178183 self ._executor = executor
179184
180- # Background thread for periodic flushing
181- self ._flush_stop_event = threading .Event ()
182- self ._flush_thread = None
183-
184- # Start the periodic flush thread
185- self ._start_flush_thread ()
186-
187- def _start_flush_thread (self ):
188- """Start the background thread for periodic flushing"""
189- self ._flush_thread = threading .Thread (target = self ._flush_worker , daemon = True )
190- self ._flush_thread .start ()
191- logger .debug (
192- "Started flush thread for connection %s (interval: %d seconds)" ,
193- self ._session_id_hex ,
194- self ._flush_interval_seconds ,
195- )
196-
197- def _flush_worker (self ):
198- """Background worker thread for periodic flushing"""
199- while not self ._flush_stop_event .wait (self ._flush_interval_seconds ):
200- logger .debug (
201- "Performing periodic flush for connection %s" , self ._session_id_hex
202- )
203- self ._flush ()
204-
205- def _stop_flush_thread (self ):
206- """Stop the background flush thread"""
207- if self ._flush_thread is not None :
208- self ._flush_stop_event .set ()
209- self ._flush_thread .join (
210- timeout = 1.0
211- ) # Wait up to 1 second for graceful shutdown
212- self ._flush_thread = None
213- logger .debug ("Stopped flush thread for connection %s" , self ._session_id_hex )
214-
215185 def _export_event (self , event ):
216186 """Add an event to the batch queue and flush if batch is full"""
217187
@@ -222,9 +192,9 @@ def _export_event(self, event):
222192 logger .debug (
223193 "Batch size limit reached (%s), flushing events" , self ._batch_size
224194 )
225- self ._flush ()
195+ self .flush ()
226196
227- def _flush (self ):
197+ def flush (self ):
228198 """Flush the current batch of events to the server"""
229199
230200 with self ._lock :
@@ -344,16 +314,15 @@ def export_failure_log(self, error_name, error_message):
344314 logger .debug ("Failed to export failure log: %s" , e )
345315
346316 def close (self ):
347- """Flush remaining events and stop timer before closing"""
317+ """Flush remaining events before closing"""
348318 logger .debug ("Closing TelemetryClient for connection %s" , self ._session_id_hex )
349- self ._stop_flush_thread ()
350- self ._flush ()
319+ self .flush ()
351320
352321
353322class TelemetryClientFactory :
354323 """
355324 Static factory class for creating and managing telemetry clients.
356- It uses a thread pool to handle asynchronous operations.
325+ It uses a thread pool to handle asynchronous operations and a single flush thread for all clients .
357326 """
358327
359328 _clients : Dict [
@@ -366,6 +335,11 @@ class TelemetryClientFactory:
366335 _original_excepthook = None
367336 _excepthook_installed = False
368337
338+ # Shared flush thread for all clients
339+ _flush_thread = None
340+ _flush_event = threading .Event ()
341+ _flush_interval_seconds = 90
342+
369343 @classmethod
370344 def _initialize (cls ):
371345 """Initialize the factory if not already initialized"""
@@ -376,11 +350,42 @@ def _initialize(cls):
376350 max_workers = 10
377351 ) # Thread pool for async operations TODO: Decide on max workers
378352 cls ._install_exception_hook ()
353+ cls ._start_flush_thread ()
379354 cls ._initialized = True
380355 logger .debug (
381- "TelemetryClientFactory initialized with thread pool (max_workers=10)"
356+ "TelemetryClientFactory initialized with thread pool (max_workers=10) and shared flush thread "
382357 )
383358
359+ @classmethod
360+ def _start_flush_thread (cls ):
361+ """Start the shared background thread for periodic flushing of all clients"""
362+ cls ._flush_event .clear ()
363+ cls ._flush_thread = threading .Thread (target = cls ._flush_worker , daemon = True )
364+ cls ._flush_thread .start ()
365+
366+ @classmethod
367+ def _flush_worker (cls ):
368+ """Background worker thread for periodic flushing of all clients"""
369+ while not cls ._flush_event .wait (cls ._flush_interval_seconds ):
370+ logger .debug ("Performing periodic flush for all telemetry clients" )
371+
372+ with cls ._lock :
373+ clients_to_flush = list (cls ._clients .values ())
374+
375+ for client in clients_to_flush :
376+ try :
377+ client .flush ()
378+ except Exception as e :
379+ logger .debug ("Failed to flush telemetry client: %s" , e )
380+
381+ @classmethod
382+ def _stop_flush_thread (cls ):
383+ """Stop the shared background flush thread"""
384+ if cls ._flush_thread is not None :
385+ cls ._flush_event .set ()
386+ cls ._flush_thread .join (timeout = 1.0 )
387+ cls ._flush_thread = None
388+
384389 @classmethod
385390 def _install_exception_hook (cls ):
386391 """Install global exception handler for unhandled exceptions"""
@@ -473,11 +478,12 @@ def close(session_id_hex):
473478 )
474479 telemetry_client .close ()
475480
476- # Shutdown executor if no more clients
481+ # Shutdown executor and flush thread if no more clients
477482 if not TelemetryClientFactory ._clients and TelemetryClientFactory ._executor :
478483 logger .debug (
479- "No more telemetry clients, shutting down thread pool executor"
484+ "No more telemetry clients, shutting down thread pool executor and flush thread "
480485 )
486+ TelemetryClientFactory ._stop_flush_thread ()
481487 TelemetryClientFactory ._executor .shutdown (wait = True )
482488 TelemetryClientFactory ._executor = None
483489 TelemetryClientFactory ._initialized = False
0 commit comments