Skip to content

Commit 6e6d3e3

Browse files
committed
updated latency_logger
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 3b4a735 commit 6e6d3e3

File tree

1 file changed

+8
-48
lines changed

1 file changed

+8
-48
lines changed

src/databricks/sql/telemetry/latency_logger.py

Lines changed: 8 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,36 +11,30 @@
1111
from databricks.sql.utils import ColumnQueue
1212

1313
# Helper to get statement_id/query_id from instance if available
14-
def _get_statement_id_from_instance(instance) -> Optional[str]:
14+
def _get_statement_id(instance) -> Optional[str]:
1515
"""
1616
Get statement ID from an instance using various methods:
1717
1. For Cursor: Use query_id property which returns UUID from active_op_handle
1818
2. For ResultSet: Use command_id which contains operationId
1919
3. For objects with active_op_handle: Convert operationId to UUID string
2020
4. For ThriftBackend: Get operation ID from session_handle if available
2121
"""
22-
# Case 1: Direct query_id property (Cursor class)
2322
if hasattr(instance, "query_id"):
2423
return instance.query_id
2524

26-
# Case 2: Direct command_id (ResultSet class)
2725
if hasattr(instance, "command_id"):
2826
return instance.guid_to_hex_id(instance.command_id.operationId.guid)
2927

30-
# Case 3: Direct active_op_handle (Cursor class)
3128
if hasattr(instance, "active_op_handle"):
3229
return instance.guid_to_hex_id(instance.active_op_handle.operationId.guid)
3330

34-
# Case 4: For ThriftBackend, get operation ID from session_handle
35-
if instance.__class__.__name__ == "ThriftBackend" and hasattr(
36-
instance, "_session_handle"
37-
):
31+
if hasattr(instance, "_session_handle") and hasattr(instance, "handle_to_hex_id"):
3832
return instance.handle_to_hex_id(instance._session_handle)
3933

4034
return None
4135

4236

43-
def _get_connection_uuid_from_instance(instance) -> Optional[str]:
37+
def _get_connection_uuid(instance) -> Optional[str]:
4438
if hasattr(instance, "connection") and instance.connection:
4539
return instance.connection.get_session_id_hex()
4640
if hasattr(instance, "get_session_id_hex"):
@@ -90,17 +84,14 @@ def _get_execution_result(instance) -> ExecutionResultFormat:
9084
2. For Cursor: Check through active_result_set
9185
3. For ThriftBackend: Check result format from server
9286
"""
93-
# Check if using cloud fetch
9487
if hasattr(instance, "_use_cloud_fetch") and instance._use_cloud_fetch:
9588
return ExecutionResultFormat.EXTERNAL_LINKS
9689

97-
# Check result format from ResultSet
9890
if hasattr(instance, "active_result_set") and instance.active_result_set:
9991
if isinstance(instance.active_result_set.results, ColumnQueue):
10092
return ExecutionResultFormat.COLUMNAR_INLINE
10193
return ExecutionResultFormat.INLINE_ARROW
10294

103-
# Check result format from ThriftBackend
10495
if hasattr(instance, "thrift_backend") and instance.thrift_backend:
10596
if hasattr(instance.thrift_backend, "_use_arrow_native_complex_types"):
10697
return ExecutionResultFormat.INLINE_ARROW
@@ -110,46 +101,15 @@ def _get_execution_result(instance) -> ExecutionResultFormat:
110101

111102
def _get_retry_count(instance) -> int:
112103
"""
113-
Get retry count from instance:
114-
1. Direct retry_policy attribute (ThriftBackend)
115-
2. Through thrift_backend attribute (Cursor/ResultSet)
116-
3. Through connection attribute (Cursor/ResultSet)
104+
Get retry count from instance by checking retry_policy.history length.
105+
The retry_policy is only accessible through thrift_backend.
117106
"""
118-
# Case 1: Direct retry_policy (ThriftBackend)
119-
if hasattr(instance, "retry_policy") and instance.retry_policy:
120-
# Get attempts from history length
121-
return (
122-
len(instance.retry_policy.history) if instance.retry_policy.history else 0
123-
)
124-
125-
# Case 2: Through thrift_backend (Cursor/ResultSet)
126107
if hasattr(instance, "thrift_backend") and instance.thrift_backend:
127108
if (
128109
hasattr(instance.thrift_backend, "retry_policy")
129110
and instance.thrift_backend.retry_policy
130111
):
131-
return (
132-
len(instance.thrift_backend.retry_policy.history)
133-
if instance.thrift_backend.retry_policy.history
134-
else 0
135-
)
136-
137-
# Case 3: Through connection (Cursor/ResultSet)
138-
if hasattr(instance, "connection") and instance.connection:
139-
if (
140-
hasattr(instance.connection, "thrift_backend")
141-
and instance.connection.thrift_backend
142-
):
143-
if (
144-
hasattr(instance.connection.thrift_backend, "retry_policy")
145-
and instance.connection.thrift_backend.retry_policy
146-
):
147-
return (
148-
len(instance.connection.thrift_backend.retry_policy.history)
149-
if instance.connection.thrift_backend.retry_policy.history
150-
else 0
151-
)
152-
112+
return len(instance.thrift_backend.retry_policy.history)
153113
return 0
154114

155115

@@ -166,7 +126,7 @@ def wrapper(self, *args, **kwargs):
166126
end_time = time.perf_counter()
167127
duration_ms = int((end_time - start_time) * 1000)
168128

169-
connection_uuid = _get_connection_uuid_from_instance(self)
129+
connection_uuid = _get_connection_uuid(self)
170130

171131
if connection_uuid:
172132
# Check if this is a volume operation
@@ -184,7 +144,7 @@ def wrapper(self, *args, **kwargs):
184144
)
185145
else:
186146
# Regular SQL execution
187-
statement_id = _get_statement_id_from_instance(self)
147+
statement_id = _get_statement_id(self)
188148
statement_type = _get_statement_type(func.__name__)
189149
is_compressed = _get_is_compressed(self)
190150
execution_result = _get_execution_result(self)

0 commit comments

Comments
 (0)