Skip to content

Commit 7430b94

Browse files
committed
Merge branch 'telemetry' into telemetry-unauth
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
2 parents 341bb75 + 0dfe0f4 commit 7430b94

File tree

4 files changed

+475
-505
lines changed

4 files changed

+475
-505
lines changed

src/databricks/sql/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
DriverConnectionParameters,
6565
HostDetails,
6666
)
67-
67+
from databricks.sql.telemetry.latency_logger import log_latency
68+
from databricks.sql.telemetry.models.enums import StatementType
6869

6970
logger = logging.getLogger(__name__)
7071

@@ -770,6 +771,7 @@ def _handle_staging_operation(
770771
session_id_hex=self.connection.get_session_id_hex(),
771772
)
772773

774+
@log_latency(StatementType.SQL)
773775
def _handle_staging_put(
774776
self, presigned_url: str, local_file: str, headers: Optional[dict] = None
775777
):
@@ -809,6 +811,7 @@ def _handle_staging_put(
809811
+ "but not yet applied on the server. It's possible this command may fail later."
810812
)
811813

814+
@log_latency(StatementType.SQL)
812815
def _handle_staging_get(
813816
self, local_file: str, presigned_url: str, headers: Optional[dict] = None
814817
):
@@ -836,6 +839,7 @@ def _handle_staging_get(
836839
with open(local_file, "wb") as fp:
837840
fp.write(r.content)
838841

842+
@log_latency(StatementType.SQL)
839843
def _handle_staging_remove(
840844
self, presigned_url: str, headers: Optional[dict] = None
841845
):
@@ -849,6 +853,7 @@ def _handle_staging_remove(
849853
session_id_hex=self.connection.get_session_id_hex(),
850854
)
851855

856+
@log_latency(StatementType.QUERY)
852857
def execute(
853858
self,
854859
operation: str,
@@ -939,6 +944,7 @@ def execute(
939944

940945
return self
941946

947+
@log_latency(StatementType.QUERY)
942948
def execute_async(
943949
self,
944950
operation: str,
@@ -1064,6 +1070,7 @@ def executemany(self, operation, seq_of_parameters):
10641070
self.execute(operation, parameters)
10651071
return self
10661072

1073+
@log_latency(StatementType.METADATA)
10671074
def catalogs(self) -> "Cursor":
10681075
"""
10691076
Get all available catalogs.
@@ -1087,6 +1094,7 @@ def catalogs(self) -> "Cursor":
10871094
)
10881095
return self
10891096

1097+
@log_latency(StatementType.METADATA)
10901098
def schemas(
10911099
self, catalog_name: Optional[str] = None, schema_name: Optional[str] = None
10921100
) -> "Cursor":
@@ -1115,6 +1123,7 @@ def schemas(
11151123
)
11161124
return self
11171125

1126+
@log_latency(StatementType.METADATA)
11181127
def tables(
11191128
self,
11201129
catalog_name: Optional[str] = None,
@@ -1150,6 +1159,7 @@ def tables(
11501159
)
11511160
return self
11521161

1162+
@log_latency(StatementType.METADATA)
11531163
def columns(
11541164
self,
11551165
catalog_name: Optional[str] = None,
@@ -1404,6 +1414,7 @@ def _fill_results_buffer(self):
14041414
self.results = results
14051415
self.has_more_rows = has_more_rows
14061416

1417+
@log_latency()
14071418
def _convert_columnar_table(self, table):
14081419
column_names = [c[0] for c in self.description]
14091420
ResultRow = Row(*column_names)
@@ -1416,6 +1427,7 @@ def _convert_columnar_table(self, table):
14161427

14171428
return result
14181429

1430+
@log_latency()
14191431
def _convert_arrow_table(self, table):
14201432
column_names = [c[0] for c in self.description]
14211433
ResultRow = Row(*column_names)
@@ -1458,6 +1470,7 @@ def _convert_arrow_table(self, table):
14581470
def rownumber(self):
14591471
return self._next_row_index
14601472

1473+
@log_latency()
14611474
def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
14621475
"""
14631476
Fetch the next set of rows of a query result, returning a PyArrow table.
@@ -1500,6 +1513,7 @@ def merge_columnar(self, result1, result2):
15001513
]
15011514
return ColumnTable(merged_result, result1.column_names)
15021515

1516+
@log_latency()
15031517
def fetchmany_columnar(self, size: int):
15041518
"""
15051519
Fetch the next set of rows of a query result, returning a Columnar Table.
@@ -1525,6 +1539,7 @@ def fetchmany_columnar(self, size: int):
15251539

15261540
return results
15271541

1542+
@log_latency()
15281543
def fetchall_arrow(self) -> "pyarrow.Table":
15291544
"""Fetch all (remaining) rows of a query result, returning them as a PyArrow table."""
15301545
results = self.results.remaining_rows()
@@ -1551,6 +1566,7 @@ def fetchall_arrow(self) -> "pyarrow.Table":
15511566
return pyarrow.Table.from_pydict(data)
15521567
return results
15531568

1569+
@log_latency()
15541570
def fetchall_columnar(self):
15551571
"""Fetch all (remaining) rows of a query result, returning them as a Columnar table."""
15561572
results = self.results.remaining_rows()
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import time
2+
import functools
3+
from typing import Optional
4+
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
5+
from databricks.sql.telemetry.models.event import (
6+
SqlExecutionEvent,
7+
)
8+
from databricks.sql.telemetry.models.enums import ExecutionResultFormat, StatementType
9+
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
10+
from uuid import UUID
11+
12+
13+
class TelemetryExtractor:
14+
"""
15+
Base class for extracting telemetry information from various object types.
16+
17+
This class serves as a proxy that delegates attribute access to the wrapped object
18+
while providing a common interface for extracting telemetry-related data.
19+
"""
20+
21+
def __init__(self, obj):
22+
"""
23+
Initialize the extractor with an object to wrap.
24+
25+
Args:
26+
obj: The object to extract telemetry information from.
27+
"""
28+
self._obj = obj
29+
30+
def __getattr__(self, name):
31+
"""
32+
Delegate attribute access to the wrapped object.
33+
34+
Args:
35+
name (str): The name of the attribute to access.
36+
37+
Returns:
38+
The attribute value from the wrapped object.
39+
"""
40+
return getattr(self._obj, name)
41+
42+
def get_session_id_hex(self):
43+
pass
44+
45+
def get_statement_id(self):
46+
pass
47+
48+
def get_is_compressed(self):
49+
pass
50+
51+
def get_execution_result(self):
52+
pass
53+
54+
def get_retry_count(self):
55+
pass
56+
57+
58+
class CursorExtractor(TelemetryExtractor):
59+
"""
60+
Telemetry extractor specialized for Cursor objects.
61+
62+
Extracts telemetry information from database cursor objects, including
63+
statement IDs, session information, compression settings, and result formats.
64+
"""
65+
66+
def get_statement_id(self) -> Optional[str]:
67+
return self.query_id
68+
69+
def get_session_id_hex(self) -> Optional[str]:
70+
return self.connection.get_session_id_hex()
71+
72+
def get_is_compressed(self) -> bool:
73+
return self.connection.lz4_compression
74+
75+
def get_execution_result(self) -> ExecutionResultFormat:
76+
if self.active_result_set is None:
77+
return ExecutionResultFormat.FORMAT_UNSPECIFIED
78+
79+
if isinstance(self.active_result_set.results, ColumnQueue):
80+
return ExecutionResultFormat.COLUMNAR_INLINE
81+
elif isinstance(self.active_result_set.results, CloudFetchQueue):
82+
return ExecutionResultFormat.EXTERNAL_LINKS
83+
elif isinstance(self.active_result_set.results, ArrowQueue):
84+
return ExecutionResultFormat.INLINE_ARROW
85+
return ExecutionResultFormat.FORMAT_UNSPECIFIED
86+
87+
def get_retry_count(self) -> int:
88+
if (
89+
hasattr(self.thrift_backend, "retry_policy")
90+
and self.thrift_backend.retry_policy
91+
):
92+
return len(self.thrift_backend.retry_policy.history)
93+
return 0
94+
95+
96+
class ResultSetExtractor(TelemetryExtractor):
97+
"""
98+
Telemetry extractor specialized for ResultSet objects.
99+
100+
Extracts telemetry information from database result set objects, including
101+
operation IDs, session information, compression settings, and result formats.
102+
"""
103+
104+
def get_statement_id(self) -> Optional[str]:
105+
if self.command_id:
106+
return str(UUID(bytes=self.command_id.operationId.guid))
107+
return None
108+
109+
def get_session_id_hex(self) -> Optional[str]:
110+
return self.connection.get_session_id_hex()
111+
112+
def get_is_compressed(self) -> bool:
113+
return self.lz4_compressed
114+
115+
def get_execution_result(self) -> ExecutionResultFormat:
116+
if isinstance(self.results, ColumnQueue):
117+
return ExecutionResultFormat.COLUMNAR_INLINE
118+
elif isinstance(self.results, CloudFetchQueue):
119+
return ExecutionResultFormat.EXTERNAL_LINKS
120+
elif isinstance(self.results, ArrowQueue):
121+
return ExecutionResultFormat.INLINE_ARROW
122+
return ExecutionResultFormat.FORMAT_UNSPECIFIED
123+
124+
def get_retry_count(self) -> int:
125+
if (
126+
hasattr(self.thrift_backend, "retry_policy")
127+
and self.thrift_backend.retry_policy
128+
):
129+
return len(self.thrift_backend.retry_policy.history)
130+
return 0
131+
132+
133+
def get_extractor(obj):
134+
"""
135+
Factory function to create the appropriate telemetry extractor for an object.
136+
137+
Determines the object type and returns the corresponding specialized extractor
138+
that can extract telemetry information from that object type.
139+
140+
Args:
141+
obj: The object to create an extractor for. Can be a Cursor, ResultSet,
142+
or any other object.
143+
144+
Returns:
145+
TelemetryExtractor: A specialized extractor instance:
146+
- CursorExtractor for Cursor objects
147+
- ResultSetExtractor for ResultSet objects
148+
- Throws an NotImplementedError for all other objects
149+
"""
150+
if obj.__class__.__name__ == "Cursor":
151+
return CursorExtractor(obj)
152+
elif obj.__class__.__name__ == "ResultSet":
153+
return ResultSetExtractor(obj)
154+
else:
155+
raise NotImplementedError(f"No extractor found for {obj.__class__.__name__}")
156+
157+
158+
def log_latency(statement_type: StatementType = StatementType.NONE):
159+
"""
160+
Decorator for logging execution latency and telemetry information.
161+
162+
This decorator measures the execution time of a method and sends telemetry
163+
data about the operation, including latency, statement information, and
164+
execution context.
165+
166+
The decorator automatically:
167+
- Measures execution time using high-precision performance counters
168+
- Extracts telemetry information from the method's object (self)
169+
- Creates a SqlExecutionEvent with execution details
170+
- Sends the telemetry data asynchronously via TelemetryClient
171+
172+
Args:
173+
statement_type (StatementType): The type of SQL statement being executed.
174+
175+
Usage:
176+
@log_latency(StatementType.SQL)
177+
def execute(self, query):
178+
# Method implementation
179+
pass
180+
181+
Returns:
182+
function: A decorator that wraps methods to add latency logging.
183+
184+
Note:
185+
The wrapped method's object (self) must be compatible with the
186+
telemetry extractor system (e.g., Cursor or ResultSet objects).
187+
"""
188+
189+
def decorator(func):
190+
@functools.wraps(func)
191+
def wrapper(self, *args, **kwargs):
192+
start_time = time.perf_counter()
193+
result = None
194+
try:
195+
result = func(self, *args, **kwargs)
196+
return result
197+
finally:
198+
199+
def _safe_call(func_to_call):
200+
"""Calls a function and returns a default value on any exception."""
201+
try:
202+
return func_to_call()
203+
except Exception:
204+
return None
205+
206+
end_time = time.perf_counter()
207+
duration_ms = int((end_time - start_time) * 1000)
208+
209+
extractor = get_extractor(self)
210+
session_id_hex = _safe_call(extractor.get_session_id_hex)
211+
statement_id = _safe_call(extractor.get_statement_id)
212+
213+
sql_exec_event = SqlExecutionEvent(
214+
statement_type=statement_type,
215+
is_compressed=_safe_call(extractor.get_is_compressed),
216+
execution_result=_safe_call(extractor.get_execution_result),
217+
retry_count=_safe_call(extractor.get_retry_count),
218+
)
219+
220+
telemetry_client = TelemetryClientFactory.get_telemetry_client(
221+
session_id_hex
222+
)
223+
telemetry_client.export_latency_log(
224+
latency_ms=duration_ms,
225+
sql_execution_event=sql_exec_event,
226+
sql_statement_id=statement_id,
227+
)
228+
229+
return wrapper
230+
231+
return decorator

0 commit comments

Comments
 (0)