Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/query_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
) as connection:

with connection.cursor() as cursor:
cursor.execute("SELECT * FROM default.diamonds LIMIT 2")
cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1")
result = cursor.fetchall()

for row in result:
Expand Down
20 changes: 20 additions & 0 deletions examples/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
import databricks.sql as sql

# Create connection with telemetry enabled
conn = sql.connect(
server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"],
http_path=os.environ["DATABRICKS_HTTP_PATH"],
access_token=os.environ["DATABRICKS_TOKEN"],
enable_telemetry=True, # Enable telemetry
telemetry_batch_size=1 # Set batch size to 1
)

# Execute a simple query to generate telemetry
cursor = conn.cursor()
cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1")
cursor.fetchall()

# Close the connection
cursor.close()
conn.close()
20 changes: 19 additions & 1 deletion src/databricks/sql/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
from typing import Dict, Tuple, List, Optional, Any, Union, Sequence

import pandas

try:
Expand Down Expand Up @@ -214,6 +213,12 @@ def read(self) -> Optional[OAuthToken]:
# use_cloud_fetch
# Enable use of cloud fetch to extract large query results in parallel via cloud storage

logger.debug(
"Connection.__init__(server_hostname=%s, http_path=%s)",
server_hostname,
http_path,
)

if access_token:
access_token_kv = {"access_token": access_token}
kwargs = {**kwargs, **access_token_kv}
Expand All @@ -228,6 +233,13 @@ def read(self) -> Optional[OAuthToken]:
server_hostname, **kwargs
)

self.server_telemetry_enabled = True
self.client_telemetry_enabled = kwargs.get("enable_telemetry", False)
self.telemetry_enabled = (
self.client_telemetry_enabled and self.server_telemetry_enabled
)
telemetry_batch_size = kwargs.get("telemetry_batch_size", 200)

user_agent_entry = kwargs.get("user_agent_entry")
if user_agent_entry is None:
user_agent_entry = kwargs.get("_user_agent_entry")
Expand Down Expand Up @@ -413,6 +425,9 @@ def _close(self, close_cursors=True) -> None:

self.open = False

if hasattr(self, "telemetry_client"):
self.telemetry_client.close()

def commit(self):
"""No-op because Databricks does not support transactions"""
pass
Expand Down Expand Up @@ -787,6 +802,9 @@ def execute(

:returns self
"""
logger.debug(
"Cursor.execute(operation=%s, parameters=%s)", operation, parameters
)

param_approach = self._determine_parameter_approach(parameters)
if param_approach == ParameterApproach.NONE:
Expand Down
24 changes: 24 additions & 0 deletions src/databricks/sql/telemetry/DriverConnectionParameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.HostDetails import HostDetails
from databricks.sql.telemetry.enums.AuthMech import AuthMech
from databricks.sql.telemetry.enums.AuthFlow import AuthFlow
from databricks.sql.telemetry.enums.DatabricksClientType import DatabricksClientType


@dataclass
class DriverConnectionParameters:
http_path: str
driver_mode: DatabricksClientType
host_details: HostDetails
auth_mech: AuthMech
auth_flow: AuthFlow
auth_scope: str
discovery_url: str
allowed_volume_ingestion_paths: str
enable_complex_datatype_support: bool
azure_tenant_id: str
socket_timeout: int

def to_json(self):
return json.dumps(asdict(self))
11 changes: 11 additions & 0 deletions src/databricks/sql/telemetry/DriverErrorInfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
from dataclasses import dataclass, asdict


@dataclass
class DriverErrorInfo:
error_name: str
stack_trace: str

def to_json(self):
return json.dumps(asdict(self))
37 changes: 37 additions & 0 deletions src/databricks/sql/telemetry/DriverSystemConfiguration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
from dataclasses import dataclass, asdict
import platform
import sys
import locale
from databricks.sql import __version__


@dataclass
class DriverSystemConfiguration:
driver_version: str
os_name: str
os_version: str
os_arch: str
runtime_name: str
runtime_version: str
runtime_vendor: str
client_app_name: str
locale_name: str
driver_name: str
char_set_encoding: str

def __init__(self):
self.driver_version = __version__
self.os_name = platform.system()
self.os_version = platform.version()
self.os_arch = platform.machine()
self.runtime_name = platform.python_implementation()
self.runtime_version = platform.python_version()
self.runtime_vendor = sys.implementation.name
self.client_app_name = "databricks-sql-python"
self.locale_name = locale.getdefaultlocale()[0]
self.driver_name = "databricks-sql-python"
self.char_set_encoding = "UTF-8"

def to_json(self):
return json.dumps(asdict(self))
14 changes: 14 additions & 0 deletions src/databricks/sql/telemetry/DriverVolumeOperation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.enums.DriverVolumeOperationType import (
DriverVolumeOperationType,
)


@dataclass
class DriverVolumeOperation:
volume_operation_type: DriverVolumeOperationType
volume_path: str

def to_json(self):
return json.dumps(asdict(self))
11 changes: 11 additions & 0 deletions src/databricks/sql/telemetry/FrontendLogContext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext


@dataclass
class FrontendLogContext:
client_context: TelemetryClientContext

def to_json(self):
return json.dumps(asdict(self))
11 changes: 11 additions & 0 deletions src/databricks/sql/telemetry/FrontendLogEntry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent


@dataclass
class FrontendLogEntry:
sql_driver_log: TelemetryEvent

def to_json(self):
return json.dumps(asdict(self))
11 changes: 11 additions & 0 deletions src/databricks/sql/telemetry/HostDetails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
from dataclasses import dataclass, asdict


@dataclass
class HostDetails:
host_url: str
port: int

def to_json(self):
return json.dumps(asdict(self))
15 changes: 15 additions & 0 deletions src/databricks/sql/telemetry/SqlExecutionEvent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.enums.StatementType import StatementType
from databricks.sql.telemetry.enums.ExecutionResultFormat import ExecutionResultFormat


@dataclass
class SqlExecutionEvent:
statement_type: StatementType
is_compressed: bool
execution_result: ExecutionResultFormat
retry_count: int

def to_json(self):
return json.dumps(asdict(self))
11 changes: 11 additions & 0 deletions src/databricks/sql/telemetry/TelemetryClientContext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import dataclass, asdict
import json


@dataclass
class TelemetryClientContext:
timestamp_millis: int
user_agent: str

def to_json(self):
return json.dumps(asdict(self))
25 changes: 25 additions & 0 deletions src/databricks/sql/telemetry/TelemetryEvent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration
from databricks.sql.telemetry.DriverConnectionParameters import (
DriverConnectionParameters,
)
from databricks.sql.telemetry.DriverVolumeOperation import DriverVolumeOperation
from databricks.sql.telemetry.SqlExecutionEvent import SqlExecutionEvent
from databricks.sql.telemetry.DriverErrorInfo import DriverErrorInfo


@dataclass
class TelemetryEvent:
session_id: str
sql_statement_id: str
system_configuration: DriverSystemConfiguration
driver_connection_params: DriverConnectionParameters
auth_type: str
vol_operation: DriverVolumeOperation
sql_operation: SqlExecutionEvent
error_info: DriverErrorInfo
latency: int

def to_json(self):
return json.dumps(asdict(self))
15 changes: 15 additions & 0 deletions src/databricks/sql/telemetry/TelemetryFrontendLog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext
from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry


@dataclass
class TelemetryFrontendLog:
workspace_id: int
frontend_log_event_id: str
context: FrontendLogContext
entry: FrontendLogEntry

def to_json(self):
return json.dumps(asdict(self))
13 changes: 13 additions & 0 deletions src/databricks/sql/telemetry/TelemetryRequest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import json
from dataclasses import dataclass, asdict
from typing import List, Optional


@dataclass
class TelemetryRequest:
uploadTime: int
items: List[str]
protoLogs: Optional[List[str]]

def to_json(self):
return json.dumps(asdict(self))
13 changes: 13 additions & 0 deletions src/databricks/sql/telemetry/TelemetryResponse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import json
from dataclasses import dataclass, asdict
from typing import List, Optional


@dataclass
class TelemetryResponse:
errors: List[str]
numSuccess: int
numProtoSuccess: int

def to_json(self):
return json.dumps(asdict(self))
8 changes: 8 additions & 0 deletions src/databricks/sql/telemetry/enums/AuthFlow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class AuthFlow(Enum):
TOKEN_PASSTHROUGH = "token_passthrough"
CLIENT_CREDENTIALS = "client_credentials"
BROWSER_BASED_AUTHENTICATION = "browser_based_authentication"
AZURE_MANAGED_IDENTITIES = "azure_managed_identities"
7 changes: 7 additions & 0 deletions src/databricks/sql/telemetry/enums/AuthMech.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum


class AuthMech(Enum):
OTHER = "other"
PAT = "pat"
OAUTH = "oauth"
6 changes: 6 additions & 0 deletions src/databricks/sql/telemetry/enums/DatabricksClientType.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import Enum


class DatabricksClientType(Enum):
SEA = "SEA"
THRIFT = "THRIFT"
10 changes: 10 additions & 0 deletions src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from enum import Enum


class DriverVolumeOperationType(Enum):
TYPE_UNSPECIFIED = "type_unspecified"
PUT = "put"
GET = "get"
DELETE = "delete"
LIST = "list"
QUERY = "query"
8 changes: 8 additions & 0 deletions src/databricks/sql/telemetry/enums/ExecutionResultFormat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class ExecutionResultFormat(Enum):
FORMAT_UNSPECIFIED = "format_unspecified"
INLINE_ARROW = "inline_arrow"
EXTERNAL_LINKS = "external_links"
COLUMNAR_INLINE = "columnar_inline"
9 changes: 9 additions & 0 deletions src/databricks/sql/telemetry/enums/StatementType.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import Enum


class StatementType(Enum):
NONE = "none"
QUERY = "query"
SQL = "sql"
UPDATE = "update"
METADATA = "metadata"
Loading
Loading