Skip to content

Commit 6f924ca

Browse files
committed
changed export_event and flush to private functions
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 53db85d commit 6f924ca

File tree

3 files changed

+17
-47
lines changed

3 files changed

+17
-47
lines changed

src/databricks/sql/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ def _close(self, close_cursors=True) -> None:
467467

468468
self.open = False
469469

470-
self._telemetry_client.close()
470+
TelemetryClientFactory.close(self.get_session_id_hex())
471471

472472
def commit(self):
473473
"""No-op because Databricks does not support transactions"""

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,6 @@ class BaseTelemetryClient(ABC):
102102
It is used to define the interface for telemetry clients.
103103
"""
104104

105-
@abstractmethod
106-
def export_event(self, event):
107-
pass
108-
109-
@abstractmethod
110-
def flush(self):
111-
pass
112-
113105
@abstractmethod
114106
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
115107
pass
@@ -136,12 +128,6 @@ def __new__(cls):
136128
cls._instance = super(NoopTelemetryClient, cls).__new__(cls)
137129
return cls._instance
138130

139-
def export_event(self, event):
140-
pass
141-
142-
def flush(self):
143-
pass
144-
145131
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
146132
pass
147133

@@ -182,7 +168,7 @@ def __init__(
182168
self._host_url = host_url
183169
self._executor = executor
184170

185-
def export_event(self, event):
171+
def _export_event(self, event):
186172
"""Add an event to the batch queue and flush if batch is full"""
187173
logger.debug("Exporting event for connection %s", self._connection_uuid)
188174
with self._lock:
@@ -191,9 +177,9 @@ def export_event(self, event):
191177
logger.debug(
192178
"Batch size limit reached (%s), flushing events", self._batch_size
193179
)
194-
self.flush()
180+
self._flush()
195181

196-
def flush(self):
182+
def _flush(self):
197183
"""Flush the current batch of events to the server"""
198184
with self._lock:
199185
events_to_flush = self._events_batch.copy()
@@ -313,11 +299,7 @@ def export_failure_log(self, error_name, error_message):
313299
def close(self):
314300
"""Flush remaining events before closing"""
315301
logger.debug("Closing TelemetryClient for connection %s", self._connection_uuid)
316-
try:
317-
self.flush()
318-
TelemetryClientFactory.close(self._connection_uuid)
319-
except Exception as e:
320-
logger.debug("Failed to close telemetry client: %s", e)
302+
self._flush()
321303

322304

323305
class TelemetryClientFactory:
@@ -365,20 +347,8 @@ def _handle_unhandled_exception(cls, exc_type, exc_value, exc_traceback):
365347
logger.debug("Handling unhandled exception: %s", exc_type.__name__)
366348

367349
# Flush existing thread pool work and wait for completion
368-
logger.debug(
369-
"Flushing pending telemetry and waiting for thread pool completion..."
370-
)
371-
for client in cls._clients.items():
372-
client.flush() # Submit any pending events
373-
374-
if cls._executor:
375-
try:
376-
cls._executor.shutdown(
377-
wait=True
378-
) # This waits for all submitted work to complete
379-
logger.debug("Thread pool shutdown completed successfully")
380-
except Exception as e:
381-
logger.debug("Thread pool shutdown failed: %s", e)
350+
for uuid, _ in cls._clients.items():
351+
cls.close(uuid)
382352

383353
# Call the original exception handler to maintain normal behavior
384354
if cls._original_excepthook:
@@ -445,6 +415,7 @@ def close(connection_uuid):
445415
logger.debug(
446416
"Removing telemetry client for connection %s", connection_uuid
447417
)
418+
TelemetryClientFactory.get_telemetry_client(connection_uuid).close()
448419
TelemetryClientFactory._clients.pop(connection_uuid, None)
449420

450421
# Shutdown executor if no more clients
@@ -455,3 +426,4 @@ def close(connection_uuid):
455426
TelemetryClientFactory._executor.shutdown(wait=True)
456427
TelemetryClientFactory._executor = None
457428
TelemetryClientFactory._initialized = False
429+

tests/unit/test_telemetry.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,18 +177,18 @@ def test_export_failure_log(
177177
def test_export_event(self, telemetry_client_setup):
178178
"""Test exporting an event."""
179179
client = telemetry_client_setup["client"]
180-
client.flush = MagicMock()
180+
client._flush = MagicMock()
181181

182182
for i in range(5):
183-
client.export_event(f"event-{i}")
183+
client._export_event(f"event-{i}")
184184

185-
client.flush.assert_not_called()
185+
client._flush.assert_not_called()
186186
assert len(client._events_batch) == 5
187187

188188
for i in range(5, 10):
189-
client.export_event(f"event-{i}")
189+
client._export_event(f"event-{i}")
190190

191-
client.flush.assert_called_once()
191+
client._flush.assert_called_once()
192192
assert len(client._events_batch) == 10
193193

194194
@patch("requests.post")
@@ -244,7 +244,7 @@ def test_flush(self, telemetry_client_setup):
244244
client._events_batch = ["event1", "event2"]
245245
client._send_telemetry = MagicMock()
246246

247-
client.flush()
247+
client._flush()
248248

249249
client._send_telemetry.assert_called_once_with(["event1", "event2"])
250250
assert client._events_batch == []
@@ -253,13 +253,11 @@ def test_flush(self, telemetry_client_setup):
253253
def test_close(self, mock_factory_class, telemetry_client_setup):
254254
"""Test closing the client."""
255255
client = telemetry_client_setup["client"]
256-
connection_uuid = telemetry_client_setup["connection_uuid"]
257-
client.flush = MagicMock()
256+
client._flush = MagicMock()
258257

259258
client.close()
260259

261-
client.flush.assert_called_once()
262-
mock_factory_class.close.assert_called_once_with(connection_uuid)
260+
client._flush.assert_called_once()
263261

264262

265263
class TestTelemetryClientFactory:

0 commit comments

Comments
 (0)