Skip to content

Commit c288da4

Browse files
Merge branch 'ext-links-sea' into sea-complex-types
2 parents b68e5de + abef941 commit c288da4

22 files changed

+1767
-886
lines changed

.github/CODEOWNERS

Lines changed: 0 additions & 5 deletions
This file was deleted.

examples/experimental/tests/test_sea_async_query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from databricks.sql.client import Connection
99
from databricks.sql.backend.types import CommandState
1010

11-
logging.basicConfig(level=logging.DEBUG)
11+
logging.basicConfig(level=logging.INFO)
1212
logger = logging.getLogger(__name__)
1313

1414

examples/experimental/tests/test_sea_sync_query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
from databricks.sql.client import Connection
88

9-
logging.basicConfig(level=logging.DEBUG)
9+
logging.basicConfig(level=logging.INFO)
1010
logger = logging.getLogger(__name__)
1111

1212

src/databricks/sql/backend/databricks_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,10 @@ def execute_command(
9191
lz4_compression: bool,
9292
cursor: "Cursor",
9393
use_cloud_fetch: bool,
94-
parameters: List[ttypes.TSparkParameter],
94+
parameters: List,
9595
async_op: bool,
9696
enforce_embedded_schema_correctness: bool,
97+
row_limit: Optional[int] = None,
9798
) -> Union["ResultSet", None]:
9899
"""
99100
Executes a SQL command or query within the specified session.
@@ -112,6 +113,7 @@ def execute_command(
112113
parameters: List of parameters to bind to the query
113114
async_op: Whether to execute the command asynchronously
114115
enforce_embedded_schema_correctness: Whether to enforce schema correctness
116+
row_limit: Maximum number of rows in the operation result.
115117
116118
Returns:
117119
If async_op is False, returns a ResultSet object containing the

src/databricks/sql/backend/sea/backend.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import time
55
import re
6-
from typing import Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set
6+
from typing import Any, Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set
77

88
from databricks.sql.backend.sea.models.base import ExternalLink, ResultManifest
99
from databricks.sql.backend.sea.utils.constants import (
@@ -17,7 +17,7 @@
1717

1818
if TYPE_CHECKING:
1919
from databricks.sql.client import Cursor
20-
from databricks.sql.result_set import SeaResultSet
20+
from databricks.sql.backend.sea.result_set import SeaResultSet
2121

2222
from databricks.sql.backend.databricks_client import DatabricksClient
2323
from databricks.sql.backend.types import (
@@ -27,7 +27,7 @@
2727
BackendType,
2828
ExecuteResponse,
2929
)
30-
from databricks.sql.exc import DatabaseError, ProgrammingError, ServerOperationError
30+
from databricks.sql.exc import DatabaseError, ServerOperationError
3131
from databricks.sql.backend.sea.utils.http_client import SeaHttpClient
3232
from databricks.sql.types import SSLOptions
3333

@@ -150,7 +150,7 @@ def _extract_warehouse_id(self, http_path: str) -> str:
150150
The extracted warehouse ID
151151
152152
Raises:
153-
ProgrammingError: If the warehouse ID cannot be extracted from the path
153+
ValueError: If the warehouse ID cannot be extracted from the path
154154
"""
155155

156156
warehouse_pattern = re.compile(r".*/warehouses/(.+)")
@@ -174,7 +174,7 @@ def _extract_warehouse_id(self, http_path: str) -> str:
174174
f"Note: SEA only works for warehouses."
175175
)
176176
logger.error(error_message)
177-
raise ProgrammingError(error_message)
177+
raise ValueError(error_message)
178178

179179
@property
180180
def max_download_threads(self) -> int:
@@ -246,14 +246,14 @@ def close_session(self, session_id: SessionId) -> None:
246246
session_id: The session identifier returned by open_session()
247247
248248
Raises:
249-
ProgrammingError: If the session ID is invalid
249+
ValueError: If the session ID is invalid
250250
OperationalError: If there's an error closing the session
251251
"""
252252

253253
logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id)
254254

255255
if session_id.backend_type != BackendType.SEA:
256-
raise ProgrammingError("Not a valid SEA session ID")
256+
raise ValueError("Not a valid SEA session ID")
257257
sea_session_id = session_id.to_sea_session_id()
258258

259259
request_data = DeleteSessionRequest(
@@ -292,7 +292,7 @@ def get_allowed_session_configurations() -> List[str]:
292292

293293
def _extract_description_from_manifest(
294294
self, manifest: ResultManifest
295-
) -> Optional[List]:
295+
) -> List[Tuple]:
296296
"""
297297
Extract column description from a manifest object, in the format defined by
298298
the spec: https://peps.python.org/pep-0249/#description
@@ -301,15 +301,12 @@ def _extract_description_from_manifest(
301301
manifest: The ResultManifest object containing schema information
302302
303303
Returns:
304-
Optional[List]: A list of column tuples or None if no columns are found
304+
List[Tuple]: A list of column tuples
305305
"""
306306

307307
schema_data = manifest.schema
308308
columns_data = schema_data.get("columns", [])
309309

310-
if not columns_data:
311-
return None
312-
313310
columns = []
314311
for col_data in columns_data:
315312
# Format: (name, type_code, display_size, internal_size, precision, scale, null_ok)
@@ -325,7 +322,7 @@ def _extract_description_from_manifest(
325322
)
326323
)
327324

328-
return columns if columns else None
325+
return columns
329326

330327
def _results_message_to_execute_response(
331328
self, response: GetStatementResponse
@@ -410,6 +407,7 @@ def execute_command(
410407
parameters: List[Dict[str, Any]],
411408
async_op: bool,
412409
enforce_embedded_schema_correctness: bool,
410+
row_limit: Optional[int] = None,
413411
) -> Union[SeaResultSet, None]:
414412
"""
415413
Execute a SQL command using the SEA backend.
@@ -431,7 +429,7 @@ def execute_command(
431429
"""
432430

433431
if session_id.backend_type != BackendType.SEA:
434-
raise ProgrammingError("Not a valid SEA session ID")
432+
raise ValueError("Not a valid SEA session ID")
435433

436434
sea_session_id = session_id.to_sea_session_id()
437435

@@ -467,7 +465,7 @@ def execute_command(
467465
format=format,
468466
wait_timeout=(WaitTimeout.ASYNC if async_op else WaitTimeout.SYNC).value,
469467
on_wait_timeout="CONTINUE",
470-
row_limit=max_rows,
468+
row_limit=row_limit,
471469
parameters=sea_parameters if sea_parameters else None,
472470
result_compression=result_compression,
473471
)
@@ -506,13 +504,15 @@ def cancel_command(self, command_id: CommandId) -> None:
506504
command_id: Command identifier to cancel
507505
508506
Raises:
509-
ProgrammingError: If the command ID is invalid
507+
ValueError: If the command ID is invalid
510508
"""
511509

512510
if command_id.backend_type != BackendType.SEA:
513-
raise ProgrammingError("Not a valid SEA command ID")
511+
raise ValueError("Not a valid SEA command ID")
514512

515513
sea_statement_id = command_id.to_sea_statement_id()
514+
if sea_statement_id is None:
515+
raise ValueError("Not a valid SEA command ID")
516516

517517
request = CancelStatementRequest(statement_id=sea_statement_id)
518518
self.http_client._make_request(
@@ -529,13 +529,15 @@ def close_command(self, command_id: CommandId) -> None:
529529
command_id: Command identifier to close
530530
531531
Raises:
532-
ProgrammingError: If the command ID is invalid
532+
ValueError: If the command ID is invalid
533533
"""
534534

535535
if command_id.backend_type != BackendType.SEA:
536-
raise ProgrammingError("Not a valid SEA command ID")
536+
raise ValueError("Not a valid SEA command ID")
537537

538538
sea_statement_id = command_id.to_sea_statement_id()
539+
if sea_statement_id is None:
540+
raise ValueError("Not a valid SEA command ID")
539541

540542
request = CloseStatementRequest(statement_id=sea_statement_id)
541543
self.http_client._make_request(
@@ -555,13 +557,15 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
555557
CommandState: The current state of the command
556558
557559
Raises:
558-
ProgrammingError: If the command ID is invalid
560+
ValueError: If the command ID is invalid
559561
"""
560562

561563
if command_id.backend_type != BackendType.SEA:
562-
raise ProgrammingError("Not a valid SEA command ID")
564+
raise ValueError("Not a valid SEA command ID")
563565

564566
sea_statement_id = command_id.to_sea_statement_id()
567+
if sea_statement_id is None:
568+
raise ValueError("Not a valid SEA command ID")
565569

566570
request = GetStatementRequest(statement_id=sea_statement_id)
567571
response_data = self.http_client._make_request(
@@ -590,13 +594,15 @@ def get_execution_result(
590594
SeaResultSet: A SeaResultSet instance with the execution results
591595
592596
Raises:
593-
ProgrammingError: If the command ID is invalid
597+
ValueError: If the command ID is invalid
594598
"""
595599

596600
if command_id.backend_type != BackendType.SEA:
597-
raise ProgrammingError("Not a valid SEA command ID")
601+
raise ValueError("Not a valid SEA command ID")
598602

599603
sea_statement_id = command_id.to_sea_statement_id()
604+
if sea_statement_id is None:
605+
raise ValueError("Not a valid SEA command ID")
600606

601607
# Create the request model
602608
request = GetStatementRequest(statement_id=sea_statement_id)
@@ -610,7 +616,7 @@ def get_execution_result(
610616
response = GetStatementResponse.from_dict(response_data)
611617

612618
# Create and return a SeaResultSet
613-
from databricks.sql.result_set import SeaResultSet
619+
from databricks.sql.backend.sea.result_set import SeaResultSet
614620

615621
execute_response = self._results_message_to_execute_response(response)
616622

0 commit comments

Comments
 (0)