22import time
33import json
44import requests
5+ import logging
56from concurrent .futures import ThreadPoolExecutor
67from databricks .sql .telemetry .models .event import (
78 TelemetryEvent ,
2627from abc import ABC , abstractmethod
2728from databricks .sql import __version__
2829
30+ logger = logging .getLogger (__name__ )
31+
2932
3033class TelemetryHelper :
3134 """Helper class for getting telemetry related information."""
@@ -145,6 +148,7 @@ def __init__(
145148 driver_connection_params ,
146149 executor ,
147150 ):
151+ logger .info (f"Initializing TelemetryClient for connection: { connection_uuid } " )
148152 self .telemetry_enabled = telemetry_enabled
149153 self .batch_size = 10 # TODO: Decide on batch size
150154 self .connection_uuid = connection_uuid
@@ -158,9 +162,13 @@ def __init__(
158162
159163 def export_event (self , event ):
160164 """Add an event to the batch queue and flush if batch is full"""
165+ logger .debug (f"Exporting event for connection { self .connection_uuid } " )
161166 with self .lock :
162167 self .events_batch .append (event )
163168 if len (self .events_batch ) >= self .batch_size :
169+ logger .debug (
170+ f"Batch size limit reached ({ self .batch_size } ), flushing events"
171+ )
164172 self .flush ()
165173
166174 def flush (self ):
@@ -170,6 +178,7 @@ def flush(self):
170178 self .events_batch = []
171179
172180 if events_to_flush :
181+ logger .info (f"Flushing { len (events_to_flush )} telemetry events to server" )
173182 self ._send_telemetry (events_to_flush )
174183
175184 def _send_telemetry (self , events ):
@@ -189,11 +198,22 @@ def _send_telemetry(self, events):
189198 if self .auth_provider :
190199 self .auth_provider .add_headers (headers )
191200
192- self .executor .submit (
193- requests .post , url , data = json .dumps (request ), headers = headers , timeout = 10
194- )
201+ try :
202+ logger .debug ("Submitting telemetry request to thread pool" )
203+ self .executor .submit (
204+ requests .post ,
205+ url ,
206+ data = json .dumps (request ),
207+ headers = headers ,
208+ timeout = 10 ,
209+ )
210+ except Exception as e :
211+ logger .error (f"Failed to submit telemetry request: { e } " )
195212
196213 def export_initial_telemetry_log (self ):
214+ logger .info (
215+ f"Exporting initial telemetry log for connection { self .connection_uuid } "
216+ )
197217
198218 telemetry_frontend_log = TelemetryFrontendLog (
199219 frontend_log_event_id = str (uuid .uuid4 ()),
@@ -215,6 +235,7 @@ def export_initial_telemetry_log(self):
215235
216236 def close (self ):
217237 """Flush remaining events before closing"""
238+ logger .info (f"Closing TelemetryClient for connection { self .connection_uuid } " )
218239 self .flush ()
219240 telemetry_client_factory .close (self .connection_uuid )
220241
@@ -275,10 +296,12 @@ def get_telemetry_client(self, connection_uuid):
275296
276297 def close (self , connection_uuid ):
277298 if connection_uuid in self ._clients :
299+ logger .debug (f"Removing telemetry client for connection { connection_uuid } " )
278300 del self ._clients [connection_uuid ]
279301
280302 # Shutdown executor if no more clients
281303 if not self ._clients :
304+ logger .info ("No more telemetry clients, shutting down thread pool executor" )
282305 self .executor .shutdown (wait = True )
283306
284307
0 commit comments