Skip to content

Commit 3dc222f

Browse files
committed
Fix telemetry loss during Python shutdown
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent a8406f8 commit 3dc222f

File tree

1 file changed

+49
-0
lines changed

1 file changed

+49
-0
lines changed

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ class TelemetryClientFactory:
308308
_executor: Optional[ThreadPoolExecutor] = None
309309
_initialized: bool = False
310310
_lock = threading.Lock() # Thread safety for factory operations
311+
_original_excepthook = None
312+
_excepthook_installed = False
311313

312314
@classmethod
313315
def _initialize(cls):
@@ -318,11 +320,58 @@ def _initialize(cls):
318320
cls._executor = ThreadPoolExecutor(
319321
max_workers=10
320322
) # Thread pool for async operations TODO: Decide on max workers
323+
cls._install_exception_hook()
321324
cls._initialized = True
322325
logger.debug(
323326
"TelemetryClientFactory initialized with thread pool (max_workers=10)"
324327
)
325328

329+
@classmethod
330+
def _install_exception_hook(cls):
331+
"""Install global exception handler for unhandled exceptions"""
332+
if not cls._excepthook_installed:
333+
import sys
334+
335+
cls._original_excepthook = sys.excepthook
336+
sys.excepthook = cls._handle_unhandled_exception
337+
cls._excepthook_installed = True
338+
logger.debug("Global exception handler installed for telemetry")
339+
340+
@classmethod
341+
def _handle_unhandled_exception(cls, exc_type, exc_value, exc_traceback):
342+
"""Handle unhandled exceptions by sending telemetry and flushing thread pool"""
343+
logger.debug("Handling unhandled exception: %s", exc_type.__name__)
344+
345+
try:
346+
# Flush existing thread pool work and wait for completion
347+
logger.debug(
348+
"Flushing pending telemetry and waiting for thread pool completion..."
349+
)
350+
for uuid, client in cls._clients.items():
351+
if hasattr(client, "flush"):
352+
try:
353+
client.flush() # Submit any pending events
354+
except Exception as e:
355+
logger.debug(
356+
"Failed to flush telemetry for connection %s: %s", uuid, e
357+
)
358+
359+
if cls._executor:
360+
try:
361+
cls._executor.shutdown(
362+
wait=True
363+
) # This waits for all submitted work to complete
364+
logger.debug("Thread pool shutdown completed successfully")
365+
except Exception as e:
366+
logger.debug("Thread pool shutdown failed: %s", e)
367+
368+
except Exception as e:
369+
logger.debug("Exception in excepthook telemetry handler: %s", e)
370+
371+
# Call the original exception handler to maintain normal behavior
372+
if cls._original_excepthook:
373+
cls._original_excepthook(exc_type, exc_value, exc_traceback)
374+
326375
@staticmethod
327376
def initialize_telemetry_client(
328377
telemetry_enabled,

0 commit comments

Comments
 (0)