Skip to content

Commit e218ef9

Browse files
committed
lazy logging, renamed private variables, defined telemetry endpoints as constants, added callback function while submitting request to executor, changed del to pop
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent bf92c6c commit e218ef9

File tree

3 files changed

+74
-49
lines changed

3 files changed

+74
-49
lines changed

src/databricks/sql/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ def read(self) -> Optional[OAuthToken]:
310310
host_url=self.host,
311311
)
312312

313-
self.telemetry_client = TelemetryClientFactory.get_telemetry_client(
313+
self._telemetry_client = TelemetryClientFactory.get_telemetry_client(
314314
connection_uuid=self.get_session_id_hex()
315315
)
316316

@@ -323,7 +323,7 @@ def read(self) -> Optional[OAuthToken]:
323323
socket_timeout=kwargs.get("_socket_timeout", None),
324324
)
325325

326-
self.telemetry_client.export_initial_telemetry_log(
326+
self._telemetry_client.export_initial_telemetry_log(
327327
driver_connection_params=driver_connection_params,
328328
user_agent=useragent_header,
329329
)
@@ -464,7 +464,7 @@ def _close(self, close_cursors=True) -> None:
464464

465465
self.open = False
466466

467-
self.telemetry_client.close()
467+
self._telemetry_client.close()
468468

469469
def commit(self):
470470
"""No-op because Databricks does not support transactions"""

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 67 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ class TelemetryClient(BaseTelemetryClient):
133133
It uses a thread pool to handle asynchronous operations, that it gets from the TelemetryClientFactory.
134134
"""
135135

136+
# Telemetry endpoint paths
137+
TELEMETRY_AUTHENTICATED_PATH = "/telemetry-ext"
138+
TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth"
139+
136140
def __init__(
137141
self,
138142
telemetry_enabled,
@@ -141,37 +145,37 @@ def __init__(
141145
host_url,
142146
executor,
143147
):
144-
logger.info(f"Initializing TelemetryClient for connection: {connection_uuid}")
145-
self.telemetry_enabled = telemetry_enabled
146-
self.batch_size = 10 # TODO: Decide on batch size
147-
self.connection_uuid = connection_uuid
148-
self.auth_provider = auth_provider
149-
self.user_agent = None
150-
self.events_batch = []
151-
self.lock = threading.Lock()
152-
self.driver_connection_params = None
153-
self.host_url = host_url
154-
self.executor = executor
148+
logger.debug("Initializing TelemetryClient for connection: %s", connection_uuid)
149+
self._telemetry_enabled = telemetry_enabled
150+
self._batch_size = 10 # TODO: Decide on batch size
151+
self._connection_uuid = connection_uuid
152+
self._auth_provider = auth_provider
153+
self._user_agent = None
154+
self._events_batch = []
155+
self._lock = threading.Lock()
156+
self._driver_connection_params = None
157+
self._host_url = host_url
158+
self._executor = executor
155159

156160
def export_event(self, event):
157161
"""Add an event to the batch queue and flush if batch is full"""
158-
logger.debug(f"Exporting event for connection {self.connection_uuid}")
159-
with self.lock:
160-
self.events_batch.append(event)
161-
if len(self.events_batch) >= self.batch_size:
162+
logger.debug("Exporting event for connection %s", self._connection_uuid)
163+
with self._lock:
164+
self._events_batch.append(event)
165+
if len(self._events_batch) >= self._batch_size:
162166
logger.debug(
163-
f"Batch size limit reached ({self.batch_size}), flushing events"
167+
"Batch size limit reached (%s), flushing events", self._batch_size
164168
)
165169
self.flush()
166170

167171
def flush(self):
168172
"""Flush the current batch of events to the server"""
169-
with self.lock:
170-
events_to_flush = self.events_batch.copy()
171-
self.events_batch = []
173+
with self._lock:
174+
events_to_flush = self._events_batch.copy()
175+
self._events_batch = []
172176

173177
if events_to_flush:
174-
logger.info(f"Flushing {len(events_to_flush)} telemetry events to server")
178+
logger.debug("Flushing %s telemetry events to server", len(events_to_flush))
175179
self._send_telemetry(events_to_flush)
176180

177181
def _send_telemetry(self, events):
@@ -183,46 +187,68 @@ def _send_telemetry(self, events):
183187
"protoLogs": [event.to_json() for event in events],
184188
}
185189

186-
path = "/telemetry-ext" if self.auth_provider else "/telemetry-unauth"
187-
url = f"https://{self.host_url}{path}"
190+
path = (
191+
self.TELEMETRY_AUTHENTICATED_PATH
192+
if self._auth_provider
193+
else self.TELEMETRY_UNAUTHENTICATED_PATH
194+
)
195+
url = f"https://{self._host_url}{path}"
188196

189197
headers = {"Accept": "application/json", "Content-Type": "application/json"}
190198

191-
if self.auth_provider:
192-
self.auth_provider.add_headers(headers)
199+
if self._auth_provider:
200+
self._auth_provider.add_headers(headers)
193201

194202
try:
195203
logger.debug("Submitting telemetry request to thread pool")
196-
self.executor.submit(
204+
future = self._executor.submit(
197205
requests.post,
198206
url,
199207
data=json.dumps(request),
200208
headers=headers,
201209
timeout=10,
202210
)
211+
future.add_done_callback(self._telemetry_request_callback)
203212
except Exception as e:
204-
logger.error(f"Failed to submit telemetry request: {e}")
213+
logger.debug("Failed to submit telemetry request: %s", e)
214+
215+
def _telemetry_request_callback(self, future):
216+
"""Callback function to handle telemetry request completion"""
217+
try:
218+
response = future.result()
219+
220+
if response.status_code == 200:
221+
logger.debug("Telemetry request completed successfully")
222+
else:
223+
logger.debug(
224+
"Telemetry request failed with status code: %s",
225+
response.status_code,
226+
)
227+
228+
except Exception as e:
229+
logger.debug("Telemetry request failed with exception: %s", e)
205230

206231
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
207-
logger.info(
208-
f"Exporting initial telemetry log for connection {self.connection_uuid}"
232+
logger.debug(
233+
"Exporting initial telemetry log for connection %s", self._connection_uuid
209234
)
210235

211-
self.driver_connection_params = driver_connection_params
212-
self.user_agent = user_agent
236+
self._driver_connection_params = driver_connection_params
237+
self._user_agent = user_agent
213238

214239
telemetry_frontend_log = TelemetryFrontendLog(
215240
frontend_log_event_id=str(uuid.uuid4()),
216241
context=FrontendLogContext(
217242
client_context=TelemetryClientContext(
218-
timestamp_millis=int(time.time() * 1000), user_agent=self.user_agent
243+
timestamp_millis=int(time.time() * 1000),
244+
user_agent=self._user_agent,
219245
)
220246
),
221247
entry=FrontendLogEntry(
222248
sql_driver_log=TelemetryEvent(
223-
session_id=self.connection_uuid,
249+
session_id=self._connection_uuid,
224250
system_configuration=TelemetryHelper.getDriverSystemConfiguration(),
225-
driver_connection_params=self.driver_connection_params,
251+
driver_connection_params=self._driver_connection_params,
226252
)
227253
),
228254
)
@@ -231,9 +257,9 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
231257

232258
def close(self):
233259
"""Flush remaining events before closing"""
234-
logger.info(f"Closing TelemetryClient for connection {self.connection_uuid}")
260+
logger.debug("Closing TelemetryClient for connection %s", self._connection_uuid)
235261
self.flush()
236-
TelemetryClientFactory.close(self.connection_uuid)
262+
TelemetryClientFactory.close(self._connection_uuid)
237263

238264

239265
class TelemetryClientFactory:
@@ -254,7 +280,6 @@ def _initialize(cls):
254280
"""Initialize the factory if not already initialized"""
255281
with cls._lock:
256282
if not cls._initialized:
257-
logger.info("Initializing TelemetryClientFactory")
258283
cls._clients = {}
259284
cls._executor = ThreadPoolExecutor(
260285
max_workers=10
@@ -276,8 +301,8 @@ def initialize_telemetry_client(
276301

277302
with TelemetryClientFactory._lock:
278303
if connection_uuid not in TelemetryClientFactory._clients:
279-
logger.info(
280-
f"Creating new TelemetryClient for connection {connection_uuid}"
304+
logger.debug(
305+
"Creating new TelemetryClient for connection %s", connection_uuid
281306
)
282307
if telemetry_enabled:
283308
TelemetryClientFactory._clients[connection_uuid] = TelemetryClient(
@@ -299,7 +324,7 @@ def get_telemetry_client(connection_uuid):
299324
return TelemetryClientFactory._clients[connection_uuid]
300325
else:
301326
logger.error(
302-
f"Telemetry client not initialized for connection {connection_uuid}"
327+
"Telemetry client not initialized for connection %s", connection_uuid
303328
)
304329
return NoopTelemetryClient()
305330

@@ -310,13 +335,13 @@ def close(connection_uuid):
310335
with TelemetryClientFactory._lock:
311336
if connection_uuid in TelemetryClientFactory._clients:
312337
logger.debug(
313-
f"Removing telemetry client for connection {connection_uuid}"
338+
"Removing telemetry client for connection %s", connection_uuid
314339
)
315-
del TelemetryClientFactory._clients[connection_uuid]
340+
TelemetryClientFactory._clients.pop(connection_uuid, None)
316341

317342
# Shutdown executor if no more clients
318343
if not TelemetryClientFactory._clients and TelemetryClientFactory._executor:
319-
logger.info(
344+
logger.debug(
320345
"No more telemetry clients, shutting down thread pool executor"
321346
)
322347
TelemetryClientFactory._executor.shutdown(wait=True)

tests/unit/test_telemetry.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ def test_export_event(self):
9393
self.client.export_event(f"event-{i}")
9494

9595
self.client.flush.assert_not_called()
96-
self.assertEqual(len(self.client.events_batch), 5)
96+
self.assertEqual(len(self.client._events_batch), 5)
9797

9898
for i in range(5, 10):
9999
self.client.export_event(f"event-{i}")
100100

101101
self.client.flush.assert_called_once()
102-
self.assertEqual(len(self.client.events_batch), 10)
102+
self.assertEqual(len(self.client._events_batch), 10)
103103

104104
@patch("requests.post")
105105
def test_send_telemetry_authenticated(self, mock_post):
@@ -144,13 +144,13 @@ def test_send_telemetry_unauthenticated(self, mock_post):
144144

145145
def test_flush(self):
146146
"""Test flushing events."""
147-
self.client.events_batch = ["event1", "event2"]
147+
self.client._events_batch = ["event1", "event2"]
148148
self.client._send_telemetry = MagicMock()
149149

150150
self.client.flush()
151151

152152
self.client._send_telemetry.assert_called_once_with(["event1", "event2"])
153-
self.assertEqual(self.client.events_batch, [])
153+
self.assertEqual(self.client._events_batch, [])
154154

155155
@patch("databricks.sql.telemetry.telemetry_client.TelemetryClientFactory")
156156
def test_close(self, mock_factory_class):

0 commit comments

Comments
 (0)