From 489c82df71a8a8ffd837278950ea3bec124e5246 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 11:04:13 +0530 Subject: [PATCH 1/7] PECOBLR-86 Improve logging for debug level Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 3 +++ src/databricks/sql/thrift_backend.py | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index ea901c3ae..46d710d93 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -214,6 +214,8 @@ def read(self) -> Optional[OAuthToken]: # use_cloud_fetch # Enable use of cloud fetch to extract large query results in parallel via cloud storage + logger.debug(f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})") + if access_token: access_token_kv = {"access_token": access_token} kwargs = {**kwargs, **access_token_kv} @@ -787,6 +789,7 @@ def execute( :returns self """ + logger.debug(f"Cursor.execute(operation={operation}, parameters={parameters})") param_approach = self._determine_parameter_approach(parameters) if param_approach == ParameterApproach.NONE: diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 2e3478d77..8b9dd9e88 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -131,6 +131,8 @@ def __init__( # max_download_threads # Number of threads for handling cloud fetch downloads. Defaults to 10 + logger.debug(f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})") + port = port or 443 if kwargs.get("_connection_uri"): uri = kwargs.get("_connection_uri") @@ -390,6 +392,8 @@ def attempt_request(attempt): # TODO: don't use exception handling for GOS polling... + logger.debug(f"ThriftBackend.attempt_request: HTTPError: {err}") + gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: delay_default = ( @@ -404,6 +408,7 @@ def attempt_request(attempt): else: raise err except OSError as err: + logger.debug(f"ThriftBackend.attempt_request: OSError: {err}") error = err error_message = str(err) # fmt: off @@ -434,6 +439,7 @@ def attempt_request(attempt): else: logger.warning(log_string) except Exception as err: + logger.debug(f"ThriftBackend.attempt_request: Exception: {err}") error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( @@ -1074,6 +1080,7 @@ def fetch_results( return queue, resp.hasMoreRows def close_command(self, op_handle): + logger.debug(f"ThriftBackend.close_command(op_handle={op_handle})") req = ttypes.TCloseOperationReq(operationHandle=op_handle) resp = self.make_request(self._client.CloseOperation, req) return resp.status From c8a8045ecefe5a5a62c9cbef82c22eb4f7fd4eb0 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 11:10:28 +0530 Subject: [PATCH 2/7] PECOBLR-86 Improve logging for debug level Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/thrift_backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 8b9dd9e88..0bcb18c70 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -894,6 +894,7 @@ def execute_command( ): assert session_handle is not None + logger.debug(f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})") spark_arrow_types = ttypes.TSparkArrowTypes( timestampAsArrow=self._use_arrow_native_timestamps, decimalAsArrow=self._use_arrow_native_decimals, From 0adfeddd9d3133c786d2f039f553948ab0e8d257 Mon Sep 17 00:00:00 2001 From: saishreeeee Date: Thu, 15 May 2025 17:08:53 +0530 Subject: [PATCH 3/7] fixed format Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 4 +++- src/databricks/sql/thrift_backend.py | 15 ++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 46d710d93..08359d775 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -214,7 +214,9 @@ def read(self) -> Optional[OAuthToken]: # use_cloud_fetch # Enable use of cloud fetch to extract large query results in parallel via cloud storage - logger.debug(f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})") + logger.debug( + f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})" + ) if access_token: access_token_kv = {"access_token": access_token} diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 0bcb18c70..6e85392ab 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -131,7 +131,9 @@ def __init__( # max_download_threads # Number of threads for handling cloud fetch downloads. Defaults to 10 - logger.debug(f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})") + logger.debug( + f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})" + ) port = port or 443 if kwargs.get("_connection_uri"): @@ -392,7 +394,7 @@ def attempt_request(attempt): # TODO: don't use exception handling for GOS polling... - logger.debug(f"ThriftBackend.attempt_request: HTTPError: {err}") + logger.error(f"ThriftBackend.attempt_request: HTTPError: {err}") gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: @@ -408,7 +410,7 @@ def attempt_request(attempt): else: raise err except OSError as err: - logger.debug(f"ThriftBackend.attempt_request: OSError: {err}") + logger.error(f"ThriftBackend.attempt_request: OSError: {err}") error = err error_message = str(err) # fmt: off @@ -439,7 +441,7 @@ def attempt_request(attempt): else: logger.warning(log_string) except Exception as err: - logger.debug(f"ThriftBackend.attempt_request: Exception: {err}") + logger.error(f"ThriftBackend.attempt_request: Exception: {err}") error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( @@ -894,7 +896,10 @@ def execute_command( ): assert session_handle is not None - logger.debug(f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})") + logger.debug( + f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})" + ) + spark_arrow_types = ttypes.TSparkArrowTypes( timestampAsArrow=self._use_arrow_native_timestamps, decimalAsArrow=self._use_arrow_native_decimals, From 511778711b6204b02a98e39a7939e206ae3c8054 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 18:09:40 +0530 Subject: [PATCH 4/7] used lazy logging Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 8 ++++++-- src/databricks/sql/thrift_backend.py | 15 ++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 08359d775..87f3bbb90 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -215,7 +215,9 @@ def read(self) -> Optional[OAuthToken]: # Enable use of cloud fetch to extract large query results in parallel via cloud storage logger.debug( - f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})" + "Connection.__init__(server_hostname=%s, http_path=%s)", + server_hostname, + http_path, ) if access_token: @@ -791,7 +793,9 @@ def execute( :returns self """ - logger.debug(f"Cursor.execute(operation={operation}, parameters={parameters})") + logger.debug( + "Cursor.execute(operation=%s, parameters=%s)", operation, parameters + ) param_approach = self._determine_parameter_approach(parameters) if param_approach == ParameterApproach.NONE: diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 6e85392ab..f8861c677 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -132,7 +132,10 @@ def __init__( # Number of threads for handling cloud fetch downloads. Defaults to 10 logger.debug( - f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})" + "ThriftBackend.__init__(server_hostname=%s, port=%s, http_path=%s)", + server_hostname, + port, + http_path, ) port = port or 443 @@ -394,7 +397,7 @@ def attempt_request(attempt): # TODO: don't use exception handling for GOS polling... - logger.error(f"ThriftBackend.attempt_request: HTTPError: {err}") + logger.error("ThriftBackend.attempt_request: HTTPError: %s", err) gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: @@ -410,7 +413,7 @@ def attempt_request(attempt): else: raise err except OSError as err: - logger.error(f"ThriftBackend.attempt_request: OSError: {err}") + logger.error("ThriftBackend.attempt_request: OSError: %s", err) error = err error_message = str(err) # fmt: off @@ -441,7 +444,7 @@ def attempt_request(attempt): else: logger.warning(log_string) except Exception as err: - logger.error(f"ThriftBackend.attempt_request: Exception: {err}") + logger.error("ThriftBackend.attempt_request: Exception: %s", err) error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( @@ -897,7 +900,9 @@ def execute_command( assert session_handle is not None logger.debug( - f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})" + "ThriftBackend.execute_command(operation=%s, session_handle=%s)", + operation, + session_handle, ) spark_arrow_types = ttypes.TSparkArrowTypes( From a1940896e6ce6680abe2431bc52edbef1abed210 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 18:27:31 +0530 Subject: [PATCH 5/7] changed debug to error logs Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/thrift_backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index f8861c677..43f800ddc 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -413,7 +413,6 @@ def attempt_request(attempt): else: raise err except OSError as err: - logger.error("ThriftBackend.attempt_request: OSError: %s", err) error = err error_message = str(err) # fmt: off From 786a7b06182f740f8631fa7df73a2cb4decad72e Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 22 May 2025 16:18:06 +0530 Subject: [PATCH 6/7] added classes required for telemetry Signed-off-by: Sai Shree Pradhan --- examples/query_execute.py | 2 +- examples/test_telemetry.py | 20 +++++++++ src/databricks/sql/client.py | 45 +++++++++++++++++++ .../telemetry/DriverConnectionParameters.py | 24 ++++++++++ .../sql/telemetry/DriverErrorInfo.py | 11 +++++ .../telemetry/DriverSystemConfiguration.py | 37 +++++++++++++++ .../sql/telemetry/DriverVolumeOperation.py | 14 ++++++ .../sql/telemetry/FrontendLogContext.py | 11 +++++ .../sql/telemetry/FrontendLogEntry.py | 11 +++++ src/databricks/sql/telemetry/HostDetails.py | 11 +++++ .../sql/telemetry/NoopTelemetryClient.py | 11 +++++ .../sql/telemetry/SqlExecutionEvent.py | 15 +++++++ .../sql/telemetry/TelemetryClient.py | 36 +++++++++++++++ .../sql/telemetry/TelemetryClientContext.py | 11 +++++ .../sql/telemetry/TelemetryEvent.py | 25 +++++++++++ .../sql/telemetry/TelemetryFrontendLog.py | 15 +++++++ .../sql/telemetry/TelemetryHelper.py | 32 +++++++++++++ .../sql/telemetry/TelemetryRequest.py | 13 ++++++ .../sql/telemetry/TelemetryResponse.py | 13 ++++++ .../sql/telemetry/enums/AuthFlow.py | 8 ++++ .../sql/telemetry/enums/AuthMech.py | 7 +++ .../telemetry/enums/DatabricksClientType.py | 6 +++ .../enums/DriverVolumeOperationType.py | 10 +++++ .../telemetry/enums/ExecutionResultFormat.py | 8 ++++ .../sql/telemetry/enums/StatementType.py | 9 ++++ 25 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 examples/test_telemetry.py create mode 100644 src/databricks/sql/telemetry/DriverConnectionParameters.py create mode 100644 src/databricks/sql/telemetry/DriverErrorInfo.py create mode 100644 src/databricks/sql/telemetry/DriverSystemConfiguration.py create mode 100644 src/databricks/sql/telemetry/DriverVolumeOperation.py create mode 100644 src/databricks/sql/telemetry/FrontendLogContext.py create mode 100644 src/databricks/sql/telemetry/FrontendLogEntry.py create mode 100644 src/databricks/sql/telemetry/HostDetails.py create mode 100644 src/databricks/sql/telemetry/NoopTelemetryClient.py create mode 100644 src/databricks/sql/telemetry/SqlExecutionEvent.py create mode 100644 src/databricks/sql/telemetry/TelemetryClient.py create mode 100644 src/databricks/sql/telemetry/TelemetryClientContext.py create mode 100644 src/databricks/sql/telemetry/TelemetryEvent.py create mode 100644 src/databricks/sql/telemetry/TelemetryFrontendLog.py create mode 100644 src/databricks/sql/telemetry/TelemetryHelper.py create mode 100644 src/databricks/sql/telemetry/TelemetryRequest.py create mode 100644 src/databricks/sql/telemetry/TelemetryResponse.py create mode 100644 src/databricks/sql/telemetry/enums/AuthFlow.py create mode 100644 src/databricks/sql/telemetry/enums/AuthMech.py create mode 100644 src/databricks/sql/telemetry/enums/DatabricksClientType.py create mode 100644 src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py create mode 100644 src/databricks/sql/telemetry/enums/ExecutionResultFormat.py create mode 100644 src/databricks/sql/telemetry/enums/StatementType.py diff --git a/examples/query_execute.py b/examples/query_execute.py index 38d2f17a8..d9ed5d8ee 100644 --- a/examples/query_execute.py +++ b/examples/query_execute.py @@ -8,7 +8,7 @@ ) as connection: with connection.cursor() as cursor: - cursor.execute("SELECT * FROM default.diamonds LIMIT 2") + cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1") result = cursor.fetchall() for row in result: diff --git a/examples/test_telemetry.py b/examples/test_telemetry.py new file mode 100644 index 000000000..4b419eaf9 --- /dev/null +++ b/examples/test_telemetry.py @@ -0,0 +1,20 @@ +import os +import databricks.sql as sql + +# Create connection with telemetry enabled +conn = sql.connect( + server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"], + http_path=os.environ["DATABRICKS_HTTP_PATH"], + access_token=os.environ["DATABRICKS_TOKEN"], + enable_telemetry=True, # Enable telemetry + telemetry_batch_size=1 # Set batch size to 1 +) + +# Execute a simple query to generate telemetry +cursor = conn.cursor() +cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1") +cursor.fetchall() + +# Close the connection +cursor.close() +conn.close() \ No newline at end of file diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 87f3bbb90..22b54df02 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,5 +1,21 @@ import time from typing import Dict, Tuple, List, Optional, Any, Union, Sequence +import uuid +from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent +from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration +from databricks.sql.telemetry.TelemetryClient import TelemetryClient +from databricks.sql.telemetry.NoopTelemetryClient import NoopTelemetryClient +from databricks.sql.telemetry.TelemetryFrontendLog import TelemetryFrontendLog +from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext +from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext +from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry +from databricks.sql.auth.auth import AuthType +from databricks.sql.auth.authenticators import ( + DatabricksOAuthProvider, + ExternalAuthProvider, + AuthProvider, + AccessTokenAuthProvider, +) import pandas @@ -234,6 +250,32 @@ def read(self) -> Optional[OAuthToken]: server_hostname, **kwargs ) + self.server_telemetry_enabled = True + self.client_telemetry_enabled = kwargs.get("enable_telemetry", False) + self.telemetry_enabled = ( + self.client_telemetry_enabled and self.server_telemetry_enabled + ) + telemetry_batch_size = kwargs.get("telemetry_batch_size", 200) + + if self.telemetry_enabled: + self.telemetry_client = TelemetryClient( + host=self.host, + connection_uuid="test-connection-uuid", + auth_provider=auth_provider, + is_authenticated=( + isinstance(auth_provider, AccessTokenAuthProvider) + or isinstance(auth_provider, DatabricksOAuthProvider) + or isinstance(auth_provider, ExternalAuthProvider) + or ( + isinstance(auth_provider, AuthProvider) + and hasattr(auth_provider, "_header_factory") + ) + ), + batch_size=telemetry_batch_size, + ) + else: + self.telemetry_client = NoopTelemetryClient() + user_agent_entry = kwargs.get("user_agent_entry") if user_agent_entry is None: user_agent_entry = kwargs.get("_user_agent_entry") @@ -419,6 +461,9 @@ def _close(self, close_cursors=True) -> None: self.open = False + if hasattr(self, "telemetry_client"): + self.telemetry_client.close() + def commit(self): """No-op because Databricks does not support transactions""" pass diff --git a/src/databricks/sql/telemetry/DriverConnectionParameters.py b/src/databricks/sql/telemetry/DriverConnectionParameters.py new file mode 100644 index 000000000..0dc69512e --- /dev/null +++ b/src/databricks/sql/telemetry/DriverConnectionParameters.py @@ -0,0 +1,24 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.HostDetails import HostDetails +from databricks.sql.telemetry.enums.AuthMech import AuthMech +from databricks.sql.telemetry.enums.AuthFlow import AuthFlow +from databricks.sql.telemetry.enums.DatabricksClientType import DatabricksClientType + + +@dataclass +class DriverConnectionParameters: + http_path: str + driver_mode: DatabricksClientType + host_details: HostDetails + auth_mech: AuthMech + auth_flow: AuthFlow + auth_scope: str + discovery_url: str + allowed_volume_ingestion_paths: str + enable_complex_datatype_support: bool + azure_tenant_id: str + socket_timeout: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/DriverErrorInfo.py b/src/databricks/sql/telemetry/DriverErrorInfo.py new file mode 100644 index 000000000..83f523756 --- /dev/null +++ b/src/databricks/sql/telemetry/DriverErrorInfo.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict + + +@dataclass +class DriverErrorInfo: + error_name: str + stack_trace: str + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/DriverSystemConfiguration.py b/src/databricks/sql/telemetry/DriverSystemConfiguration.py new file mode 100644 index 000000000..60af0831c --- /dev/null +++ b/src/databricks/sql/telemetry/DriverSystemConfiguration.py @@ -0,0 +1,37 @@ +import json +from dataclasses import dataclass, asdict +import platform +import sys +import locale +from databricks.sql import __version__ + + +@dataclass +class DriverSystemConfiguration: + driver_version: str + os_name: str + os_version: str + os_arch: str + runtime_name: str + runtime_version: str + runtime_vendor: str + client_app_name: str + locale_name: str + driver_name: str + char_set_encoding: str + + def __init__(self): + self.driver_version = __version__ + self.os_name = platform.system() + self.os_version = platform.version() + self.os_arch = platform.machine() + self.runtime_name = platform.python_implementation() + self.runtime_version = platform.python_version() + self.runtime_vendor = sys.implementation.name + self.client_app_name = "databricks-sql-python" + self.locale_name = locale.getdefaultlocale()[0] + self.driver_name = "databricks-sql-python" + self.char_set_encoding = "UTF-8" + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/DriverVolumeOperation.py b/src/databricks/sql/telemetry/DriverVolumeOperation.py new file mode 100644 index 000000000..7a6e32e4d --- /dev/null +++ b/src/databricks/sql/telemetry/DriverVolumeOperation.py @@ -0,0 +1,14 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.enums.DriverVolumeOperationType import ( + DriverVolumeOperationType, +) + + +@dataclass +class DriverVolumeOperation: + volume_operation_type: DriverVolumeOperationType + volume_path: str + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/FrontendLogContext.py b/src/databricks/sql/telemetry/FrontendLogContext.py new file mode 100644 index 000000000..e6145e3e4 --- /dev/null +++ b/src/databricks/sql/telemetry/FrontendLogContext.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext + + +@dataclass +class FrontendLogContext: + client_context: TelemetryClientContext + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/FrontendLogEntry.py b/src/databricks/sql/telemetry/FrontendLogEntry.py new file mode 100644 index 000000000..1351ec31a --- /dev/null +++ b/src/databricks/sql/telemetry/FrontendLogEntry.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent + + +@dataclass +class FrontendLogEntry: + sql_driver_log: TelemetryEvent + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/HostDetails.py b/src/databricks/sql/telemetry/HostDetails.py new file mode 100644 index 000000000..3c2888909 --- /dev/null +++ b/src/databricks/sql/telemetry/HostDetails.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict + + +@dataclass +class HostDetails: + host_url: str + port: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/NoopTelemetryClient.py b/src/databricks/sql/telemetry/NoopTelemetryClient.py new file mode 100644 index 000000000..d81f5badf --- /dev/null +++ b/src/databricks/sql/telemetry/NoopTelemetryClient.py @@ -0,0 +1,11 @@ +class NoopTelemetryClient: + # A no-operation telemetry client that implements the same interface but does nothing + + def export_event(self, event): + pass + + def flush(self): + pass + + def close(self): + pass diff --git a/src/databricks/sql/telemetry/SqlExecutionEvent.py b/src/databricks/sql/telemetry/SqlExecutionEvent.py new file mode 100644 index 000000000..9d2efae97 --- /dev/null +++ b/src/databricks/sql/telemetry/SqlExecutionEvent.py @@ -0,0 +1,15 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.enums.StatementType import StatementType +from databricks.sql.telemetry.enums.ExecutionResultFormat import ExecutionResultFormat + + +@dataclass +class SqlExecutionEvent: + statement_type: StatementType + is_compressed: bool + execution_result: ExecutionResultFormat + retry_count: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryClient.py b/src/databricks/sql/telemetry/TelemetryClient.py new file mode 100644 index 000000000..22ee2ef67 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryClient.py @@ -0,0 +1,36 @@ +import threading +import time +import json +import requests +from concurrent.futures import ThreadPoolExecutor + + +class TelemetryClient: + def __init__( + self, + host, + connection_uuid, + auth_provider=None, + is_authenticated=False, + batch_size=200, + ): + self.host = host + self.connection_uuid = connection_uuid + self.auth_provider = auth_provider + self.is_authenticated = is_authenticated + self.batch_size = batch_size + self.events_batch = [] + self.lock = threading.Lock() + self.executor = ThreadPoolExecutor( + max_workers=5 + ) # Thread pool for async operations + self.DriverConnectionParameters = None + + def export_event(self, event): + pass + + def flush(self): + pass + + def close(self): + pass diff --git a/src/databricks/sql/telemetry/TelemetryClientContext.py b/src/databricks/sql/telemetry/TelemetryClientContext.py new file mode 100644 index 000000000..f23e92897 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryClientContext.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass, asdict +import json + + +@dataclass +class TelemetryClientContext: + timestamp_millis: int + user_agent: str + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryEvent.py b/src/databricks/sql/telemetry/TelemetryEvent.py new file mode 100644 index 000000000..fcbb2b77d --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryEvent.py @@ -0,0 +1,25 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration +from databricks.sql.telemetry.DriverConnectionParameters import ( + DriverConnectionParameters, +) +from databricks.sql.telemetry.DriverVolumeOperation import DriverVolumeOperation +from databricks.sql.telemetry.SqlExecutionEvent import SqlExecutionEvent +from databricks.sql.telemetry.DriverErrorInfo import DriverErrorInfo + + +@dataclass +class TelemetryEvent: + session_id: str + sql_statement_id: str + system_configuration: DriverSystemConfiguration + driver_connection_params: DriverConnectionParameters + auth_type: str + vol_operation: DriverVolumeOperation + sql_operation: SqlExecutionEvent + error_info: DriverErrorInfo + latency: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryFrontendLog.py b/src/databricks/sql/telemetry/TelemetryFrontendLog.py new file mode 100644 index 000000000..abbc1120e --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryFrontendLog.py @@ -0,0 +1,15 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext +from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry + + +@dataclass +class TelemetryFrontendLog: + workspace_id: int + frontend_log_event_id: str + context: FrontendLogContext + entry: FrontendLogEntry + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryHelper.py b/src/databricks/sql/telemetry/TelemetryHelper.py new file mode 100644 index 000000000..e2e635859 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryHelper.py @@ -0,0 +1,32 @@ +import platform +import sys +import uuid +import time +from typing import Optional + +from databricks.sql import __version__ +from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration + + +class TelemetryHelper: + + # Singleton instance of DriverSystemConfiguration + _DRIVER_SYSTEM_CONFIGURATION = None + + @classmethod + def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration: + if cls._DRIVER_SYSTEM_CONFIGURATION is None: + cls._DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration( + driverName="Databricks SQL Python Connector", + driverVersion=__version__, + runtimeName=f"Python {sys.version.split()[0]}", + runtimeVendor=platform.python_implementation(), + runtimeVersion=platform.python_version(), + osName=platform.system(), + osVersion=platform.release(), + osArch=platform.machine(), + clientAppName=None, + localeName=f"{platform.system()}_{platform.release()}", + charSetEncoding=sys.getdefaultencoding(), + ) + return cls._DRIVER_SYSTEM_CONFIGURATION diff --git a/src/databricks/sql/telemetry/TelemetryRequest.py b/src/databricks/sql/telemetry/TelemetryRequest.py new file mode 100644 index 000000000..8142e1182 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryRequest.py @@ -0,0 +1,13 @@ +import json +from dataclasses import dataclass, asdict +from typing import List, Optional + + +@dataclass +class TelemetryRequest: + uploadTime: int + items: List[str] + protoLogs: Optional[List[str]] + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryResponse.py b/src/databricks/sql/telemetry/TelemetryResponse.py new file mode 100644 index 000000000..3b14050d1 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryResponse.py @@ -0,0 +1,13 @@ +import json +from dataclasses import dataclass, asdict +from typing import List, Optional + + +@dataclass +class TelemetryResponse: + errors: List[str] + numSuccess: int + numProtoSuccess: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/enums/AuthFlow.py b/src/databricks/sql/telemetry/enums/AuthFlow.py new file mode 100644 index 000000000..2afc35c74 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/AuthFlow.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class AuthFlow(Enum): + TOKEN_PASSTHROUGH = "token_passthrough" + CLIENT_CREDENTIALS = "client_credentials" + BROWSER_BASED_AUTHENTICATION = "browser_based_authentication" + AZURE_MANAGED_IDENTITIES = "azure_managed_identities" diff --git a/src/databricks/sql/telemetry/enums/AuthMech.py b/src/databricks/sql/telemetry/enums/AuthMech.py new file mode 100644 index 000000000..6425eea47 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/AuthMech.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class AuthMech(Enum): + OTHER = "other" + PAT = "pat" + OAUTH = "oauth" diff --git a/src/databricks/sql/telemetry/enums/DatabricksClientType.py b/src/databricks/sql/telemetry/enums/DatabricksClientType.py new file mode 100644 index 000000000..8e08c355f --- /dev/null +++ b/src/databricks/sql/telemetry/enums/DatabricksClientType.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class DatabricksClientType(Enum): + SEA = "SEA" + THRIFT = "THRIFT" diff --git a/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py b/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py new file mode 100644 index 000000000..581e56c25 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class DriverVolumeOperationType(Enum): + TYPE_UNSPECIFIED = "type_unspecified" + PUT = "put" + GET = "get" + DELETE = "delete" + LIST = "list" + QUERY = "query" diff --git a/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py b/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py new file mode 100644 index 000000000..23e18150e --- /dev/null +++ b/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class ExecutionResultFormat(Enum): + FORMAT_UNSPECIFIED = "format_unspecified" + INLINE_ARROW = "inline_arrow" + EXTERNAL_LINKS = "external_links" + COLUMNAR_INLINE = "columnar_inline" diff --git a/src/databricks/sql/telemetry/enums/StatementType.py b/src/databricks/sql/telemetry/enums/StatementType.py new file mode 100644 index 000000000..cea86bab8 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/StatementType.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class StatementType(Enum): + NONE = "none" + QUERY = "query" + SQL = "sql" + UPDATE = "update" + METADATA = "metadata" From a48881fc7edf8257589efb1b4e33e79cff34cc1e Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 22 May 2025 18:24:19 +0530 Subject: [PATCH 7/7] removed TelemetryHelper Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 36 ------------------- .../sql/telemetry/NoopTelemetryClient.py | 11 ------ .../sql/telemetry/TelemetryClient.py | 36 ------------------- .../sql/telemetry/TelemetryHelper.py | 32 ----------------- 4 files changed, 115 deletions(-) delete mode 100644 src/databricks/sql/telemetry/NoopTelemetryClient.py delete mode 100644 src/databricks/sql/telemetry/TelemetryClient.py delete mode 100644 src/databricks/sql/telemetry/TelemetryHelper.py diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 22b54df02..9eef14a27 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,22 +1,5 @@ import time from typing import Dict, Tuple, List, Optional, Any, Union, Sequence -import uuid -from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent -from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration -from databricks.sql.telemetry.TelemetryClient import TelemetryClient -from databricks.sql.telemetry.NoopTelemetryClient import NoopTelemetryClient -from databricks.sql.telemetry.TelemetryFrontendLog import TelemetryFrontendLog -from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext -from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext -from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry -from databricks.sql.auth.auth import AuthType -from databricks.sql.auth.authenticators import ( - DatabricksOAuthProvider, - ExternalAuthProvider, - AuthProvider, - AccessTokenAuthProvider, -) - import pandas try: @@ -257,25 +240,6 @@ def read(self) -> Optional[OAuthToken]: ) telemetry_batch_size = kwargs.get("telemetry_batch_size", 200) - if self.telemetry_enabled: - self.telemetry_client = TelemetryClient( - host=self.host, - connection_uuid="test-connection-uuid", - auth_provider=auth_provider, - is_authenticated=( - isinstance(auth_provider, AccessTokenAuthProvider) - or isinstance(auth_provider, DatabricksOAuthProvider) - or isinstance(auth_provider, ExternalAuthProvider) - or ( - isinstance(auth_provider, AuthProvider) - and hasattr(auth_provider, "_header_factory") - ) - ), - batch_size=telemetry_batch_size, - ) - else: - self.telemetry_client = NoopTelemetryClient() - user_agent_entry = kwargs.get("user_agent_entry") if user_agent_entry is None: user_agent_entry = kwargs.get("_user_agent_entry") diff --git a/src/databricks/sql/telemetry/NoopTelemetryClient.py b/src/databricks/sql/telemetry/NoopTelemetryClient.py deleted file mode 100644 index d81f5badf..000000000 --- a/src/databricks/sql/telemetry/NoopTelemetryClient.py +++ /dev/null @@ -1,11 +0,0 @@ -class NoopTelemetryClient: - # A no-operation telemetry client that implements the same interface but does nothing - - def export_event(self, event): - pass - - def flush(self): - pass - - def close(self): - pass diff --git a/src/databricks/sql/telemetry/TelemetryClient.py b/src/databricks/sql/telemetry/TelemetryClient.py deleted file mode 100644 index 22ee2ef67..000000000 --- a/src/databricks/sql/telemetry/TelemetryClient.py +++ /dev/null @@ -1,36 +0,0 @@ -import threading -import time -import json -import requests -from concurrent.futures import ThreadPoolExecutor - - -class TelemetryClient: - def __init__( - self, - host, - connection_uuid, - auth_provider=None, - is_authenticated=False, - batch_size=200, - ): - self.host = host - self.connection_uuid = connection_uuid - self.auth_provider = auth_provider - self.is_authenticated = is_authenticated - self.batch_size = batch_size - self.events_batch = [] - self.lock = threading.Lock() - self.executor = ThreadPoolExecutor( - max_workers=5 - ) # Thread pool for async operations - self.DriverConnectionParameters = None - - def export_event(self, event): - pass - - def flush(self): - pass - - def close(self): - pass diff --git a/src/databricks/sql/telemetry/TelemetryHelper.py b/src/databricks/sql/telemetry/TelemetryHelper.py deleted file mode 100644 index e2e635859..000000000 --- a/src/databricks/sql/telemetry/TelemetryHelper.py +++ /dev/null @@ -1,32 +0,0 @@ -import platform -import sys -import uuid -import time -from typing import Optional - -from databricks.sql import __version__ -from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration - - -class TelemetryHelper: - - # Singleton instance of DriverSystemConfiguration - _DRIVER_SYSTEM_CONFIGURATION = None - - @classmethod - def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration: - if cls._DRIVER_SYSTEM_CONFIGURATION is None: - cls._DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration( - driverName="Databricks SQL Python Connector", - driverVersion=__version__, - runtimeName=f"Python {sys.version.split()[0]}", - runtimeVendor=platform.python_implementation(), - runtimeVersion=platform.python_version(), - osName=platform.system(), - osVersion=platform.release(), - osArch=platform.machine(), - clientAppName=None, - localeName=f"{platform.system()}_{platform.release()}", - charSetEncoding=sys.getdefaultencoding(), - ) - return cls._DRIVER_SYSTEM_CONFIGURATION