Skip to content

Commit 0da893f

Browse files
committed
batch size configurable, flush interval
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 6748c2c commit 0da893f

File tree

3 files changed

+50
-9
lines changed

3 files changed

+50
-9
lines changed

src/databricks/sql/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def read(self) -> Optional[OAuthToken]:
250250
self.telemetry_enabled = (
251251
self.client_telemetry_enabled and self.server_telemetry_enabled
252252
)
253+
self.telemetry_batch_size = kwargs.get("telemetry_batch_size")
253254

254255
user_agent_entry = kwargs.get("user_agent_entry")
255256
if user_agent_entry is None:
@@ -311,6 +312,7 @@ def read(self) -> Optional[OAuthToken]:
311312
session_id_hex=self.get_session_id_hex(),
312313
auth_provider=auth_provider,
313314
host_url=self.host,
315+
batch_size=self.telemetry_batch_size,
314316
)
315317

316318
self._telemetry_client = TelemetryClientFactory.get_telemetry_client(

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,22 @@ class TelemetryClient(BaseTelemetryClient):
150150
TELEMETRY_AUTHENTICATED_PATH = "/telemetry-ext"
151151
TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth"
152152

153+
DEFAULT_BATCH_SIZE = 100
154+
DEFAULT_FLUSH_INTERVAL_SECONDS = 90
155+
153156
def __init__(
154157
self,
155158
telemetry_enabled,
156159
session_id_hex,
157160
auth_provider,
158161
host_url,
159162
executor,
163+
batch_size=None,
160164
):
161165
logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex)
162166
self._telemetry_enabled = telemetry_enabled
163-
self._batch_size = 10 # TODO: Decide on batch size
167+
self._batch_size = batch_size if batch_size is not None else self.DEFAULT_BATCH_SIZE
168+
self._flush_interval_seconds = self.DEFAULT_FLUSH_INTERVAL_SECONDS
164169
self._session_id_hex = session_id_hex
165170
self._auth_provider = auth_provider
166171
self._user_agent = None
@@ -169,9 +174,41 @@ def __init__(
169174
self._driver_connection_params = None
170175
self._host_url = host_url
171176
self._executor = executor
177+
self._flush_timer = None
178+
179+
# Start the periodic flush timer
180+
self._start_flush_timer()
181+
182+
def _start_flush_timer(self):
183+
"""Start the periodic flush timer"""
184+
185+
self._flush_timer = threading.Timer(
186+
self._flush_interval_seconds,
187+
self._periodic_flush
188+
)
189+
self._flush_timer.daemon = True # Don't prevent program exit
190+
self._flush_timer.start()
191+
logger.debug("Started flush timer for connection %s (interval: %d seconds)",
192+
self._session_id_hex, self._flush_interval_seconds)
193+
194+
def _periodic_flush(self):
195+
"""Periodic flush callback - flushes events and reschedules the timer"""
196+
197+
logger.debug("Performing periodic flush for connection %s", self._session_id_hex)
198+
self._flush()
199+
# Reschedule the next flush
200+
self._start_flush_timer()
201+
202+
def _stop_flush_timer(self):
203+
"""Stop the periodic flush timer"""
204+
if self._flush_timer is not None:
205+
self._flush_timer.cancel()
206+
self._flush_timer = None
207+
logger.debug("Stopped flush timer for connection %s", self._session_id_hex)
172208

173209
def _export_event(self, event):
174210
"""Add an event to the batch queue and flush if batch is full"""
211+
175212
logger.debug("Exporting event for connection %s", self._session_id_hex)
176213
with self._lock:
177214
self._events_batch.append(event)
@@ -183,6 +220,7 @@ def _export_event(self, event):
183220

184221
def _flush(self):
185222
"""Flush the current batch of events to the server"""
223+
186224
with self._lock:
187225
events_to_flush = self._events_batch.copy()
188226
self._events_batch = []
@@ -300,8 +338,9 @@ def export_failure_log(self, error_name, error_message):
300338
logger.debug("Failed to export failure log: %s", e)
301339

302340
def close(self):
303-
"""Flush remaining events before closing"""
341+
"""Flush remaining events and stop timer before closing"""
304342
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
343+
self._stop_flush_timer()
305344
self._flush()
306345

307346

@@ -364,6 +403,7 @@ def initialize_telemetry_client(
364403
session_id_hex,
365404
auth_provider,
366405
host_url,
406+
batch_size=None,
367407
):
368408
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
369409
try:
@@ -385,6 +425,7 @@ def initialize_telemetry_client(
385425
auth_provider=auth_provider,
386426
host_url=host_url,
387427
executor=TelemetryClientFactory._executor,
428+
batch_size=batch_size,
388429
)
389430
else:
390431
TelemetryClientFactory._clients[

tests/unit/test_telemetry.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,22 +181,20 @@ def test_export_failure_log(
181181

182182
client._export_event.assert_called_once_with(mock_frontend_log.return_value)
183183

184-
def test_export_event(self, telemetry_client_setup):
185-
"""Test exporting an event."""
184+
def test_batch_size_flush(self, telemetry_client_setup):
185+
"""Test batch size flush."""
186186
client = telemetry_client_setup["client"]
187187
client._flush = MagicMock()
188188

189-
for i in range(5):
189+
for i in range(TelemetryClient._batch_size-1):
190190
client._export_event(f"event-{i}")
191191

192192
client._flush.assert_not_called()
193-
assert len(client._events_batch) == 5
193+
assert len(client._events_batch) == TelemetryClient._batch_size-1
194194

195-
for i in range(5, 10):
196-
client._export_event(f"event-{i}")
195+
client._export_event(f"event-{TelemetryClient._batch_size - 1}")
197196

198197
client._flush.assert_called_once()
199-
assert len(client._events_batch) == 10
200198

201199
@patch("requests.post")
202200
def test_send_telemetry_authenticated(self, mock_post, telemetry_client_setup):

0 commit comments

Comments
 (0)