Skip to content

Commit 4624458

Browse files
samikshya-dbclaude
andcommitted
Pass session_id parameter to telemetry export methods
With host-level telemetry batching, multiple connections share one TelemetryClient. Each client stores session_id_hex from the first connection that created it. This caused all subsequent connections' telemetry events to use the wrong session ID. Changes: - Modified telemetry export method signatures to accept optional session_id - Updated Connection.export_initial_telemetry_log() to pass session_id - Updated latency_logger.py export_latency_log() to pass session_id - Updated Error.__init__() to accept optional session_id_hex and pass it - Updated all error raises in Connection and Cursor to pass session_id_hex 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 0b3dd82 commit 4624458

File tree

4 files changed

+60
-13
lines changed

4 files changed

+60
-13
lines changed

src/databricks/sql/client.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ def read(self) -> Optional[OAuthToken]:
391391
self._telemetry_client.export_initial_telemetry_log(
392392
driver_connection_params=driver_connection_params,
393393
user_agent=self.session.useragent_header,
394+
session_id=self.get_session_id_hex(),
394395
)
395396

396397
def _set_use_inline_params_with_warning(self, value: Union[bool, str]):
@@ -495,6 +496,7 @@ def cursor(
495496
raise InterfaceError(
496497
"Cannot create cursor from closed connection",
497498
host_url=self.session.host,
499+
session_id_hex=self.get_session_id_hex(),
498500
)
499501

500502
cursor = Cursor(
@@ -547,6 +549,7 @@ def autocommit(self) -> bool:
547549
raise InterfaceError(
548550
"Cannot get autocommit on closed connection",
549551
host_url=self.session.host,
552+
session_id_hex=self.get_session_id_hex(),
550553
)
551554

552555
if self._fetch_autocommit_from_server:
@@ -579,6 +582,7 @@ def autocommit(self, value: bool) -> None:
579582
raise InterfaceError(
580583
"Cannot set autocommit on closed connection",
581584
host_url=self.session.host,
585+
session_id_hex=self.get_session_id_hex(),
582586
)
583587

584588
# Create internal cursor for transaction control
@@ -601,6 +605,7 @@ def autocommit(self, value: bool) -> None:
601605
"autocommit_value": value,
602606
},
603607
host_url=self.session.host,
608+
session_id_hex=self.get_session_id_hex(),
604609
) from e
605610
finally:
606611
if cursor:
@@ -628,6 +633,7 @@ def _fetch_autocommit_state_from_server(self) -> bool:
628633
"No result returned from SET AUTOCOMMIT query",
629634
context={"operation": "fetch_autocommit"},
630635
host_url=self.session.host,
636+
session_id_hex=self.get_session_id_hex(),
631637
)
632638

633639
# Parse value (first column should be "true" or "false")
@@ -648,6 +654,7 @@ def _fetch_autocommit_state_from_server(self) -> bool:
648654
f"Failed to fetch autocommit state from server: {e.message}",
649655
context={**e.context, "operation": "fetch_autocommit"},
650656
host_url=self.session.host,
657+
session_id_hex=self.get_session_id_hex(),
651658
) from e
652659
finally:
653660
if cursor:
@@ -681,6 +688,7 @@ def commit(self) -> None:
681688
raise InterfaceError(
682689
"Cannot commit on closed connection",
683690
host_url=self.session.host,
691+
session_id_hex=self.get_session_id_hex(),
684692
)
685693

686694
cursor = None
@@ -693,6 +701,7 @@ def commit(self) -> None:
693701
f"Failed to commit transaction: {e.message}",
694702
context={**e.context, "operation": "commit"},
695703
host_url=self.session.host,
704+
session_id_hex=self.get_session_id_hex(),
696705
) from e
697706
finally:
698707
if cursor:
@@ -726,12 +735,14 @@ def rollback(self) -> None:
726735
raise NotSupportedError(
727736
"Transactions are not supported on Databricks",
728737
host_url=self.session.host,
738+
session_id_hex=self.get_session_id_hex(),
729739
)
730740

731741
if not self.open:
732742
raise InterfaceError(
733743
"Cannot rollback on closed connection",
734744
host_url=self.session.host,
745+
session_id_hex=self.get_session_id_hex(),
735746
)
736747

737748
cursor = None
@@ -744,6 +755,7 @@ def rollback(self) -> None:
744755
f"Failed to rollback transaction: {e.message}",
745756
context={**e.context, "operation": "rollback"},
746757
host_url=self.session.host,
758+
session_id_hex=self.get_session_id_hex(),
747759
) from e
748760
finally:
749761
if cursor:
@@ -768,6 +780,7 @@ def get_transaction_isolation(self) -> str:
768780
raise InterfaceError(
769781
"Cannot get transaction isolation on closed connection",
770782
host_url=self.session.host,
783+
session_id_hex=self.get_session_id_hex(),
771784
)
772785

773786
return TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ
@@ -794,6 +807,7 @@ def set_transaction_isolation(self, level: str) -> None:
794807
raise InterfaceError(
795808
"Cannot set transaction isolation on closed connection",
796809
host_url=self.session.host,
810+
session_id_hex=self.get_session_id_hex(),
797811
)
798812

799813
# Normalize and validate isolation level
@@ -806,6 +820,7 @@ def set_transaction_isolation(self, level: str) -> None:
806820
f"Setting transaction isolation level '{level}' is not supported. "
807821
f"Only {TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ} is supported.",
808822
host_url=self.session.host,
823+
session_id_hex=self.get_session_id_hex(),
809824
)
810825

811826

@@ -858,6 +873,7 @@ def __iter__(self):
858873
raise ProgrammingError(
859874
"There is no active result set",
860875
host_url=self.connection.session.host,
876+
session_id_hex=self.connection.get_session_id_hex(),
861877
)
862878

863879
def _determine_parameter_approach(
@@ -998,6 +1014,7 @@ def _check_not_closed(self):
9981014
raise InterfaceError(
9991015
"Attempting operation on closed cursor",
10001016
host_url=self.connection.session.host,
1017+
session_id_hex=self.connection.get_session_id_hex(),
10011018
)
10021019

10031020
def _handle_staging_operation(
@@ -1042,6 +1059,7 @@ def _handle_staging_operation(
10421059
raise ProgrammingError(
10431060
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands",
10441061
host_url=self.connection.session.host,
1062+
session_id_hex=self.connection.get_session_id_hex(),
10451063
)
10461064

10471065
abs_staging_allowed_local_paths = [
@@ -1068,6 +1086,7 @@ def _handle_staging_operation(
10681086
raise ProgrammingError(
10691087
"Local file operations are restricted to paths within the configured staging_allowed_local_path",
10701088
host_url=self.connection.session.host,
1089+
session_id_hex=self.connection.get_session_id_hex(),
10711090
)
10721091

10731092
handler_args = {
@@ -1096,6 +1115,7 @@ def _handle_staging_operation(
10961115
f"Operation {row.operation} is not supported. "
10971116
+ "Supported operations are GET, PUT, and REMOVE",
10981117
host_url=self.connection.session.host,
1118+
session_id_hex=self.connection.get_session_id_hex(),
10991119
)
11001120

11011121
@log_latency(StatementType.SQL)
@@ -1111,6 +1131,7 @@ def _handle_staging_put(
11111131
raise ProgrammingError(
11121132
"Cannot perform PUT without specifying a local_file",
11131133
host_url=self.connection.session.host,
1134+
session_id_hex=self.connection.get_session_id_hex(),
11141135
)
11151136

11161137
with open(local_file, "rb") as fh:
@@ -1136,6 +1157,7 @@ def _handle_staging_http_response(self, r):
11361157
raise OperationalError(
11371158
f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}",
11381159
host_url=self.connection.session.host,
1160+
session_id_hex=self.connection.get_session_id_hex(),
11391161
)
11401162

11411163
if r.status == ACCEPTED:
@@ -1167,6 +1189,7 @@ def _handle_staging_put_stream(
11671189
raise ProgrammingError(
11681190
"No input stream provided for streaming operation",
11691191
host_url=self.connection.session.host,
1192+
session_id_hex=self.connection.get_session_id_hex(),
11701193
)
11711194

11721195
r = self.connection.http_client.request(
@@ -1188,6 +1211,7 @@ def _handle_staging_get(
11881211
raise ProgrammingError(
11891212
"Cannot perform GET without specifying a local_file",
11901213
host_url=self.connection.session.host,
1214+
session_id_hex=self.connection.get_session_id_hex(),
11911215
)
11921216

11931217
r = self.connection.http_client.request(
@@ -1202,6 +1226,7 @@ def _handle_staging_get(
12021226
raise OperationalError(
12031227
f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}",
12041228
host_url=self.connection.session.host,
1229+
session_id_hex=self.connection.get_session_id_hex(),
12051230
)
12061231

12071232
with open(local_file, "wb") as fp:
@@ -1223,6 +1248,7 @@ def _handle_staging_remove(
12231248
raise OperationalError(
12241249
f"Staging operation over HTTP was unsuccessful: {r.status}-{error_text}",
12251250
host_url=self.connection.session.host,
1251+
session_id_hex=self.connection.get_session_id_hex(),
12261252
)
12271253

12281254
@log_latency(StatementType.QUERY)
@@ -1414,6 +1440,7 @@ def get_async_execution_result(self):
14141440
raise OperationalError(
14151441
f"get_execution_result failed with Operation status {operation_state}",
14161442
host_url=self.connection.session.host,
1443+
session_id_hex=self.connection.get_session_id_hex(),
14171444
)
14181445

14191446
def executemany(self, operation, seq_of_parameters):
@@ -1542,6 +1569,7 @@ def fetchall(self) -> List[Row]:
15421569
raise ProgrammingError(
15431570
"There is no active result set",
15441571
host_url=self.connection.session.host,
1572+
session_id_hex=self.connection.get_session_id_hex(),
15451573
)
15461574

15471575
def fetchone(self) -> Optional[Row]:
@@ -1559,6 +1587,7 @@ def fetchone(self) -> Optional[Row]:
15591587
raise ProgrammingError(
15601588
"There is no active result set",
15611589
host_url=self.connection.session.host,
1590+
session_id_hex=self.connection.get_session_id_hex(),
15621591
)
15631592

15641593
def fetchmany(self, size: int) -> List[Row]:
@@ -1584,6 +1613,7 @@ def fetchmany(self, size: int) -> List[Row]:
15841613
raise ProgrammingError(
15851614
"There is no active result set",
15861615
host_url=self.connection.session.host,
1616+
session_id_hex=self.connection.get_session_id_hex(),
15871617
)
15881618

15891619
def fetchall_arrow(self) -> "pyarrow.Table":
@@ -1594,6 +1624,7 @@ def fetchall_arrow(self) -> "pyarrow.Table":
15941624
raise ProgrammingError(
15951625
"There is no active result set",
15961626
host_url=self.connection.session.host,
1627+
session_id_hex=self.connection.get_session_id_hex(),
15971628
)
15981629

15991630
def fetchmany_arrow(self, size) -> "pyarrow.Table":
@@ -1604,6 +1635,7 @@ def fetchmany_arrow(self, size) -> "pyarrow.Table":
16041635
raise ProgrammingError(
16051636
"There is no active result set",
16061637
host_url=self.connection.session.host,
1638+
session_id_hex=self.connection.get_session_id_hex(),
16071639
)
16081640

16091641
def cancel(self) -> None:

src/databricks/sql/exc.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,15 @@ class Error(Exception):
1111
`context`: Optional extra context about the error. MUST be JSON serializable
1212
"""
1313

14-
def __init__(self, message=None, context=None, host_url=None, *args, **kwargs):
14+
def __init__(
15+
self,
16+
message=None,
17+
context=None,
18+
host_url=None,
19+
session_id_hex=None,
20+
*args,
21+
**kwargs,
22+
):
1523
super().__init__(message, *args, **kwargs)
1624
self.message = message
1725
self.context = context or {}
@@ -23,7 +31,9 @@ def __init__(self, message=None, context=None, host_url=None, *args, **kwargs):
2331
telemetry_client = TelemetryClientFactory.get_telemetry_client(
2432
host_url=host_url
2533
)
26-
telemetry_client.export_failure_log(error_name, self.message)
34+
telemetry_client.export_failure_log(
35+
error_name, self.message, session_id=session_id_hex
36+
)
2737

2838
def __str__(self):
2939
return self.message

src/databricks/sql/telemetry/latency_logger.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def wrapper(self, *args, **kwargs):
212212
latency_ms=duration_ms,
213213
sql_execution_event=sql_exec_event,
214214
sql_statement_id=telemetry_data.get("statement_id"),
215+
session_id=session_id_hex,
215216
)
216217

217218
return wrapper

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ def __new__(cls):
147147
cls._instance = super(NoopTelemetryClient, cls).__new__(cls)
148148
return cls._instance
149149

150-
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
150+
def export_initial_telemetry_log(self, driver_connection_params, user_agent, session_id=None):
151151
pass
152152

153-
def export_failure_log(self, error_name, error_message):
153+
def export_failure_log(self, error_name, error_message, session_id=None):
154154
pass
155155

156-
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
156+
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id, session_id=None):
157157
pass
158158

159159
def close(self):
@@ -352,19 +352,22 @@ def _telemetry_request_callback(self, future, sent_count: int):
352352
except Exception as e:
353353
logger.debug("Telemetry request failed with exception: %s", e)
354354

355-
def _export_telemetry_log(self, **telemetry_event_kwargs):
355+
def _export_telemetry_log(self, session_id=None, **telemetry_event_kwargs):
356356
"""
357357
Common helper method for exporting telemetry logs.
358358
359359
Args:
360+
session_id: Optional session ID for this event. If not provided, uses the client's session ID.
360361
**telemetry_event_kwargs: Keyword arguments to pass to TelemetryEvent constructor
361362
"""
362-
logger.debug("Exporting telemetry log for connection %s", self._session_id_hex)
363+
# Use provided session_id or fall back to client's session_id
364+
actual_session_id = session_id or self._session_id_hex
365+
logger.debug("Exporting telemetry log for connection %s", actual_session_id)
363366

364367
try:
365368
# Set common fields for all telemetry events
366369
event_kwargs = {
367-
"session_id": self._session_id_hex,
370+
"session_id": actual_session_id,
368371
"system_configuration": TelemetryHelper.get_driver_system_configuration(),
369372
"driver_connection_params": self._driver_connection_params,
370373
}
@@ -387,17 +390,18 @@ def _export_telemetry_log(self, **telemetry_event_kwargs):
387390
except Exception as e:
388391
logger.debug("Failed to export telemetry log: %s", e)
389392

390-
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
393+
def export_initial_telemetry_log(self, driver_connection_params, user_agent, session_id=None):
391394
self._driver_connection_params = driver_connection_params
392395
self._user_agent = user_agent
393-
self._export_telemetry_log()
396+
self._export_telemetry_log(session_id=session_id)
394397

395-
def export_failure_log(self, error_name, error_message):
398+
def export_failure_log(self, error_name, error_message, session_id=None):
396399
error_info = DriverErrorInfo(error_name=error_name, stack_trace=error_message)
397-
self._export_telemetry_log(error_info=error_info)
400+
self._export_telemetry_log(session_id=session_id, error_info=error_info)
398401

399-
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
402+
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id, session_id=None):
400403
self._export_telemetry_log(
404+
session_id=session_id,
401405
sql_statement_id=sql_statement_id,
402406
sql_operation=sql_execution_event,
403407
operation_latency_ms=latency_ms,

0 commit comments

Comments
 (0)