Skip to content

Commit 4970db2

Browse files
committed
added functionality for export of failure logs
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 9dc7d52 commit 4970db2

File tree

4 files changed

+184
-64
lines changed

4 files changed

+184
-64
lines changed

src/databricks/sql/client.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,10 @@ def cursor(
421421
Will throw an Error if the connection has been closed.
422422
"""
423423
if not self.open:
424-
raise Error("Cannot create cursor from closed connection")
424+
raise Error(
425+
"Cannot create cursor from closed connection",
426+
connection_uuid=self.get_session_id_hex(),
427+
)
425428

426429
cursor = Cursor(
427430
self,
@@ -471,7 +474,10 @@ def commit(self):
471474
pass
472475

473476
def rollback(self):
474-
raise NotSupportedError("Transactions are not supported on Databricks")
477+
raise NotSupportedError(
478+
"Transactions are not supported on Databricks",
479+
connection_uuid=self.get_session_id_hex(),
480+
)
475481

476482

477483
class Cursor:
@@ -523,7 +529,10 @@ def __iter__(self):
523529
for row in self.active_result_set:
524530
yield row
525531
else:
526-
raise Error("There is no active result set")
532+
raise Error(
533+
"There is no active result set",
534+
connection_uuid=self.connection.get_session_id_hex(),
535+
)
527536

528537
def _determine_parameter_approach(
529538
self, params: Optional[TParameterCollection]
@@ -660,7 +669,10 @@ def _close_and_clear_active_result_set(self):
660669

661670
def _check_not_closed(self):
662671
if not self.open:
663-
raise Error("Attempting operation on closed cursor")
672+
raise Error(
673+
"Attempting operation on closed cursor",
674+
connection_uuid=self.connection.get_session_id_hex(),
675+
)
664676

665677
def _handle_staging_operation(
666678
self, staging_allowed_local_path: Union[None, str, List[str]]
@@ -678,7 +690,8 @@ def _handle_staging_operation(
678690
_staging_allowed_local_paths = staging_allowed_local_path
679691
else:
680692
raise Error(
681-
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands"
693+
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands",
694+
connection_uuid=self.connection.get_session_id_hex(),
682695
)
683696

684697
abs_staging_allowed_local_paths = [
@@ -707,7 +720,8 @@ def _handle_staging_operation(
707720
continue
708721
if not allow_operation:
709722
raise Error(
710-
"Local file operations are restricted to paths within the configured staging_allowed_local_path"
723+
"Local file operations are restricted to paths within the configured staging_allowed_local_path",
724+
connection_uuid=self.connection.get_session_id_hex(),
711725
)
712726

713727
# May be real headers, or could be json string
@@ -737,7 +751,8 @@ def _handle_staging_operation(
737751
else:
738752
raise Error(
739753
f"Operation {row.operation} is not supported. "
740-
+ "Supported operations are GET, PUT, and REMOVE"
754+
+ "Supported operations are GET, PUT, and REMOVE",
755+
connection_uuid=self.connection.get_session_id_hex(),
741756
)
742757

743758
def _handle_staging_put(
@@ -749,7 +764,10 @@ def _handle_staging_put(
749764
"""
750765

751766
if local_file is None:
752-
raise Error("Cannot perform PUT without specifying a local_file")
767+
raise Error(
768+
"Cannot perform PUT without specifying a local_file",
769+
connection_uuid=self.connection.get_session_id_hex(),
770+
)
753771

754772
with open(local_file, "rb") as fh:
755773
r = requests.put(url=presigned_url, data=fh, headers=headers)
@@ -766,7 +784,8 @@ def _handle_staging_put(
766784

767785
if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]:
768786
raise Error(
769-
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
787+
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
788+
connection_uuid=self.connection.get_session_id_hex(),
770789
)
771790

772791
if r.status_code == ACCEPTED:
@@ -784,15 +803,19 @@ def _handle_staging_get(
784803
"""
785804

786805
if local_file is None:
787-
raise Error("Cannot perform GET without specifying a local_file")
806+
raise Error(
807+
"Cannot perform GET without specifying a local_file",
808+
connection_uuid=self.connection.get_session_id_hex(),
809+
)
788810

789811
r = requests.get(url=presigned_url, headers=headers)
790812

791813
# response.ok verifies the status code is not between 400-600.
792814
# Any 2xx or 3xx will evaluate r.ok == True
793815
if not r.ok:
794816
raise Error(
795-
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
817+
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
818+
connection_uuid=self.connection.get_session_id_hex(),
796819
)
797820

798821
with open(local_file, "wb") as fp:
@@ -807,7 +830,8 @@ def _handle_staging_remove(
807830

808831
if not r.ok:
809832
raise Error(
810-
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
833+
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
834+
connection_uuid=self.connection.get_session_id_hex(),
811835
)
812836

813837
def execute(
@@ -1006,7 +1030,8 @@ def get_async_execution_result(self):
10061030
return self
10071031
else:
10081032
raise Error(
1009-
f"get_execution_result failed with Operation status {operation_state}"
1033+
f"get_execution_result failed with Operation status {operation_state}",
1034+
connection_uuid=self.connection.get_session_id_hex(),
10101035
)
10111036

10121037
def executemany(self, operation, seq_of_parameters):
@@ -1156,7 +1181,10 @@ def fetchall(self) -> List[Row]:
11561181
if self.active_result_set:
11571182
return self.active_result_set.fetchall()
11581183
else:
1159-
raise Error("There is no active result set")
1184+
raise Error(
1185+
"There is no active result set",
1186+
connection_uuid=self.connection.get_session_id_hex(),
1187+
)
11601188

11611189
def fetchone(self) -> Optional[Row]:
11621190
"""
@@ -1170,7 +1198,10 @@ def fetchone(self) -> Optional[Row]:
11701198
if self.active_result_set:
11711199
return self.active_result_set.fetchone()
11721200
else:
1173-
raise Error("There is no active result set")
1201+
raise Error(
1202+
"There is no active result set",
1203+
connection_uuid=self.connection.get_session_id_hex(),
1204+
)
11741205

11751206
def fetchmany(self, size: int) -> List[Row]:
11761207
"""
@@ -1192,21 +1223,30 @@ def fetchmany(self, size: int) -> List[Row]:
11921223
if self.active_result_set:
11931224
return self.active_result_set.fetchmany(size)
11941225
else:
1195-
raise Error("There is no active result set")
1226+
raise Error(
1227+
"There is no active result set",
1228+
connection_uuid=self.connection.get_session_id_hex(),
1229+
)
11961230

11971231
def fetchall_arrow(self) -> "pyarrow.Table":
11981232
self._check_not_closed()
11991233
if self.active_result_set:
12001234
return self.active_result_set.fetchall_arrow()
12011235
else:
1202-
raise Error("There is no active result set")
1236+
raise Error(
1237+
"There is no active result set",
1238+
connection_uuid=self.connection.get_session_id_hex(),
1239+
)
12031240

12041241
def fetchmany_arrow(self, size) -> "pyarrow.Table":
12051242
self._check_not_closed()
12061243
if self.active_result_set:
12071244
return self.active_result_set.fetchmany_arrow(size)
12081245
else:
1209-
raise Error("There is no active result set")
1246+
raise Error(
1247+
"There is no active result set",
1248+
connection_uuid=self.connection.get_session_id_hex(),
1249+
)
12101250

12111251
def cancel(self) -> None:
12121252
"""

src/databricks/sql/exc.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import json
22
import logging
3+
import traceback
34

4-
logger = logging.getLogger(__name__)
5+
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
56

7+
logger = logging.getLogger(__name__)
68

79
### PEP-249 Mandated ###
810
class Error(Exception):
@@ -11,10 +13,23 @@ class Error(Exception):
1113
`context`: Optional extra context about the error. MUST be JSON serializable
1214
"""
1315

14-
def __init__(self, message=None, context=None, *args, **kwargs):
16+
def __init__(
17+
self, message=None, context=None, connection_uuid=None, *args, **kwargs
18+
):
1519
super().__init__(message, *args, **kwargs)
1620
self.message = message
1721
self.context = context or {}
22+
self.connection_uuid = connection_uuid
23+
24+
error_name = self.__class__.__name__
25+
if self.connection_uuid:
26+
try:
27+
telemetry_client = TelemetryClientFactory.get_telemetry_client(
28+
self.connection_uuid
29+
)
30+
telemetry_client.export_failure_log(error_name, self.message)
31+
except Exception as telemetry_error:
32+
logger.error(f"Failed to send error to telemetry: {telemetry_error}")
1833

1934
def __str__(self):
2035
return self.message

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from databricks.sql.telemetry.models.event import (
99
TelemetryEvent,
1010
DriverSystemConfiguration,
11+
DriverErrorInfo,
1112
)
1213
from databricks.sql.telemetry.models.frontend_logs import (
1314
TelemetryFrontendLog,
@@ -26,30 +27,33 @@
2627
import uuid
2728
import locale
2829
from abc import ABC, abstractmethod
29-
from databricks.sql import __version__
3030

3131
logger = logging.getLogger(__name__)
3232

3333

3434
class TelemetryHelper:
3535
"""Helper class for getting telemetry related information."""
3636

37-
_DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration(
38-
driver_name="Databricks SQL Python Connector",
39-
driver_version=__version__,
40-
runtime_name=f"Python {sys.version.split()[0]}",
41-
runtime_vendor=platform.python_implementation(),
42-
runtime_version=platform.python_version(),
43-
os_name=platform.system(),
44-
os_version=platform.release(),
45-
os_arch=platform.machine(),
46-
client_app_name=None, # TODO: Add client app name
47-
locale_name=locale.getlocale()[0] or locale.getdefaultlocale()[0],
48-
char_set_encoding=sys.getdefaultencoding(),
49-
)
37+
_DRIVER_SYSTEM_CONFIGURATION = None
5038

5139
@classmethod
5240
def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration:
41+
if cls._DRIVER_SYSTEM_CONFIGURATION is None:
42+
from databricks.sql import __version__
43+
44+
cls._DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration(
45+
driver_name="Databricks SQL Python Connector",
46+
driver_version=__version__,
47+
runtime_name=f"Python {sys.version.split()[0]}",
48+
runtime_vendor=platform.python_implementation(),
49+
runtime_version=platform.python_version(),
50+
os_name=platform.system(),
51+
os_version=platform.release(),
52+
os_arch=platform.machine(),
53+
client_app_name=None, # TODO: Add client app name
54+
locale_name=locale.getlocale()[0] or locale.getdefaultlocale()[0],
55+
char_set_encoding=sys.getdefaultencoding(),
56+
)
5357
return cls._DRIVER_SYSTEM_CONFIGURATION
5458

5559
@staticmethod
@@ -99,7 +103,11 @@ class BaseTelemetryClient(ABC):
99103
"""
100104

101105
@abstractmethod
102-
def export_initial_telemetry_log(self, **kwargs):
106+
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
107+
pass
108+
109+
@abstractmethod
110+
def export_failure_log(self, error_name, error_message):
103111
pass
104112

105113
@abstractmethod
@@ -123,6 +131,9 @@ def __new__(cls):
123131
def export_initial_telemetry_log(self, driver_connection_params, user_agent):
124132
pass
125133

134+
def export_failure_log(self, error_name, error_message):
135+
pass
136+
126137
def close(self):
127138
pass
128139

@@ -255,10 +266,33 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
255266

256267
self.export_event(telemetry_frontend_log)
257268

269+
def export_failure_log(self, error_name, error_message):
270+
logger.debug("Exporting failure log for connection %s", self._connection_uuid)
271+
error_info = DriverErrorInfo(error_name=error_name, stack_trace=error_message)
272+
telemetry_frontend_log = TelemetryFrontendLog(
273+
frontend_log_event_id=str(uuid.uuid4()),
274+
context=FrontendLogContext(
275+
client_context=TelemetryClientContext(
276+
timestamp_millis=int(time.time() * 1000),
277+
user_agent=self._user_agent,
278+
)
279+
),
280+
entry=FrontendLogEntry(
281+
sql_driver_log=TelemetryEvent(
282+
session_id=self._connection_uuid,
283+
system_configuration=TelemetryHelper.getDriverSystemConfiguration(),
284+
driver_connection_params=self._driver_connection_params,
285+
error_info=error_info,
286+
)
287+
),
288+
)
289+
self.export_event(telemetry_frontend_log)
290+
258291
def close(self):
259292
"""Flush remaining events before closing"""
260293
logger.debug("Closing TelemetryClient for connection %s", self._connection_uuid)
261294
self.flush()
295+
262296
TelemetryClientFactory.close(self._connection_uuid)
263297

264298

0 commit comments

Comments
 (0)