4343
4444from databricks .sql .types import Row , SSLOptions
4545from databricks .sql .auth .auth import get_python_sql_connector_auth_provider
46- from databricks .sql .experimental .oauth_persistence import OAuthPersistence
47-
46+ from databricks .sql .telemetry .telemetry_client import (
47+ telemetry_client ,
48+ NoopTelemetryClient ,
49+ )
4850from databricks .sql .thrift_api .TCLIService .ttypes import (
4951 TSparkParameter ,
5052 TOperationState ,
5153)
52-
54+ from databricks .sql .telemetry .latency_logger import log_latency
55+ from databricks .sql .telemetry .models .enums import DriverVolumeOperationType
5356
5457logger = logging .getLogger (__name__ )
5558
@@ -238,6 +241,9 @@ def read(self) -> Optional[OAuthToken]:
238241 self .telemetry_enabled = (
239242 self .client_telemetry_enabled and self .server_telemetry_enabled
240243 )
244+ telemetry_batch_size = kwargs .get (
245+ "telemetry_batch_size" , 100
246+ ) # TODO: Decide on batch size
241247
242248 user_agent_entry = kwargs .get ("user_agent_entry" )
243249 if user_agent_entry is None :
@@ -294,6 +300,25 @@ def read(self) -> Optional[OAuthToken]:
294300 kwargs .get ("use_inline_params" , False )
295301 )
296302
303+ if self .telemetry_enabled :
304+ telemetry_client .initialize (
305+ host = self .host ,
306+ connection_uuid = self .get_session_id_hex (),
307+ auth_provider = auth_provider ,
308+ is_authenticated = True , # TODO: Add authentication logic later
309+ batch_size = telemetry_batch_size ,
310+ user_agent = useragent_header ,
311+ )
312+
313+ telemetry_client .export_initial_telemetry_log (
314+ http_path ,
315+ self .port ,
316+ kwargs .get ("_socket_timeout" , None ),
317+ self .get_session_id_hex (),
318+ )
319+ else :
320+ self .telemetry_client = NoopTelemetryClient ()
321+
297322 def _set_use_inline_params_with_warning (self , value : Union [bool , str ]):
298323 """Valid values are True, False, and "silent"
299324
@@ -430,6 +455,9 @@ def _close(self, close_cursors=True) -> None:
430455
431456 self .open = False
432457
458+ if self .telemetry_enabled :
459+ telemetry_client .close (self .get_session_id_hex ())
460+
433461 def commit (self ):
434462 """No-op because Databricks does not support transactions"""
435463 pass
@@ -487,7 +515,10 @@ def __iter__(self):
487515 for row in self .active_result_set :
488516 yield row
489517 else :
490- raise Error ("There is no active result set" )
518+ raise Error (
519+ "There is no active result set" ,
520+ connection_uuid = self .connection .get_session_id_hex (),
521+ )
491522
492523 def _determine_parameter_approach (
493524 self , params : Optional [TParameterCollection ]
@@ -624,7 +655,10 @@ def _close_and_clear_active_result_set(self):
624655
625656 def _check_not_closed (self ):
626657 if not self .open :
627- raise Error ("Attempting operation on closed cursor" )
658+ raise Error (
659+ "Attempting operation on closed cursor" ,
660+ connection_uuid = self .connection .get_session_id_hex (),
661+ )
628662
629663 def _handle_staging_operation (
630664 self , staging_allowed_local_path : Union [None , str , List [str ]]
@@ -642,7 +676,8 @@ def _handle_staging_operation(
642676 _staging_allowed_local_paths = staging_allowed_local_path
643677 else :
644678 raise Error (
645- "You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands"
679+ "You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands" ,
680+ connection_uuid = self .connection .get_session_id_hex (),
646681 )
647682
648683 abs_staging_allowed_local_paths = [
@@ -671,7 +706,8 @@ def _handle_staging_operation(
671706 continue
672707 if not allow_operation :
673708 raise Error (
674- "Local file operations are restricted to paths within the configured staging_allowed_local_path"
709+ "Local file operations are restricted to paths within the configured staging_allowed_local_path" ,
710+ connection_uuid = self .connection .get_session_id_hex (),
675711 )
676712
677713 # May be real headers, or could be json string
@@ -701,9 +737,11 @@ def _handle_staging_operation(
701737 else :
702738 raise Error (
703739 f"Operation { row .operation } is not supported. "
704- + "Supported operations are GET, PUT, and REMOVE"
740+ + "Supported operations are GET, PUT, and REMOVE" ,
741+ connection_uuid = self .connection .get_session_id_hex (),
705742 )
706743
744+ @log_latency ()
707745 def _handle_staging_put (
708746 self , presigned_url : str , local_file : str , headers : Optional [dict ] = None
709747 ):
@@ -713,7 +751,13 @@ def _handle_staging_put(
713751 """
714752
715753 if local_file is None :
716- raise Error ("Cannot perform PUT without specifying a local_file" )
754+ raise Error (
755+ "Cannot perform PUT without specifying a local_file" ,
756+ connection_uuid = self .connection .get_session_id_hex (),
757+ )
758+
759+ self .volume_operation_type = DriverVolumeOperationType .PUT
760+ self .volume_path = local_file
717761
718762 with open (local_file , "rb" ) as fh :
719763 r = requests .put (url = presigned_url , data = fh , headers = headers )
@@ -730,7 +774,8 @@ def _handle_staging_put(
730774
731775 if r .status_code not in [OK , CREATED , NO_CONTENT , ACCEPTED ]:
732776 raise Error (
733- f"Staging operation over HTTP was unsuccessful: { r .status_code } -{ r .text } "
777+ f"Staging operation over HTTP was unsuccessful: { r .status_code } -{ r .text } " ,
778+ connection_uuid = self .connection .get_session_id_hex (),
734779 )
735780
736781 if r .status_code == ACCEPTED :
@@ -739,6 +784,7 @@ def _handle_staging_put(
739784 + "but not yet applied on the server. It's possible this command may fail later."
740785 )
741786
787+ @log_latency ()
742788 def _handle_staging_get (
743789 self , local_file : str , presigned_url : str , headers : Optional [dict ] = None
744790 ):
@@ -748,25 +794,38 @@ def _handle_staging_get(
748794 """
749795
750796 if local_file is None :
751- raise Error ("Cannot perform GET without specifying a local_file" )
797+ raise Error (
798+ "Cannot perform GET without specifying a local_file" ,
799+ connection_uuid = self .connection .get_session_id_hex (),
800+ )
801+
802+ self .volume_operation_type = DriverVolumeOperationType .GET
803+ self .volume_path = local_file
752804
753805 r = requests .get (url = presigned_url , headers = headers )
754806
755807 # response.ok verifies the status code is not between 400-600.
756808 # Any 2xx or 3xx will evaluate r.ok == True
757809 if not r .ok :
758810 raise Error (
759- f"Staging operation over HTTP was unsuccessful: { r .status_code } -{ r .text } "
811+ f"Staging operation over HTTP was unsuccessful: { r .status_code } -{ r .text } " ,
812+ connection_uuid = self .connection .get_session_id_hex (),
760813 )
761814
762815 with open (local_file , "wb" ) as fp :
763816 fp .write (r .content )
764817
818+ @log_latency ()
765819 def _handle_staging_remove (
766820 self , presigned_url : str , headers : Optional [dict ] = None
767821 ):
768822 """Make an HTTP DELETE request to the presigned_url"""
769823
824+ self .volume_operation_type = DriverVolumeOperationType .DELETE
825+ self .volume_path = (
826+ presigned_url # Using presigned URL as path since there's no local file
827+ )
828+
770829 r = requests .delete (url = presigned_url , headers = headers )
771830
772831 if not r .ok :
@@ -970,7 +1029,8 @@ def get_async_execution_result(self):
9701029 return self
9711030 else :
9721031 raise Error (
973- f"get_execution_result failed with Operation status { operation_state } "
1032+ f"get_execution_result failed with Operation status { operation_state } " ,
1033+ connection_uuid = self .connection .get_session_id_hex (),
9741034 )
9751035
9761036 def executemany (self , operation , seq_of_parameters ):
@@ -1120,7 +1180,10 @@ def fetchall(self) -> List[Row]:
11201180 if self .active_result_set :
11211181 return self .active_result_set .fetchall ()
11221182 else :
1123- raise Error ("There is no active result set" )
1183+ raise Error (
1184+ "There is no active result set" ,
1185+ connection_uuid = self .connection .get_session_id_hex (),
1186+ )
11241187
11251188 def fetchone (self ) -> Optional [Row ]:
11261189 """
@@ -1134,7 +1197,10 @@ def fetchone(self) -> Optional[Row]:
11341197 if self .active_result_set :
11351198 return self .active_result_set .fetchone ()
11361199 else :
1137- raise Error ("There is no active result set" )
1200+ raise Error (
1201+ "There is no active result set" ,
1202+ connection_uuid = self .connection .get_session_id_hex (),
1203+ )
11381204
11391205 def fetchmany (self , size : int ) -> List [Row ]:
11401206 """
@@ -1156,21 +1222,30 @@ def fetchmany(self, size: int) -> List[Row]:
11561222 if self .active_result_set :
11571223 return self .active_result_set .fetchmany (size )
11581224 else :
1159- raise Error ("There is no active result set" )
1225+ raise Error (
1226+ "There is no active result set" ,
1227+ connection_uuid = self .connection .get_session_id_hex (),
1228+ )
11601229
11611230 def fetchall_arrow (self ) -> "pyarrow.Table" :
11621231 self ._check_not_closed ()
11631232 if self .active_result_set :
11641233 return self .active_result_set .fetchall_arrow ()
11651234 else :
1166- raise Error ("There is no active result set" )
1235+ raise Error (
1236+ "There is no active result set" ,
1237+ connection_uuid = self .connection .get_session_id_hex (),
1238+ )
11671239
11681240 def fetchmany_arrow (self , size ) -> "pyarrow.Table" :
11691241 self ._check_not_closed ()
11701242 if self .active_result_set :
11711243 return self .active_result_set .fetchmany_arrow (size )
11721244 else :
1173- raise Error ("There is no active result set" )
1245+ raise Error (
1246+ "There is no active result set" ,
1247+ connection_uuid = self .connection .get_session_id_hex (),
1248+ )
11741249
11751250 def cancel (self ) -> None :
11761251 """
0 commit comments