Skip to content

Commit c0bf461

Browse files
merge
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
2 parents 75505cf + e96e5b8 commit c0bf461

File tree

7 files changed

+775
-86
lines changed

7 files changed

+775
-86
lines changed

examples/experimental/sea_connector_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import subprocess
1111
from typing import List, Tuple
1212

13-
logging.basicConfig(level=logging.DEBUG)
13+
# Configure logging
14+
logging.basicConfig(level=logging.INFO)
1415
logger = logging.getLogger(__name__)
1516

1617
TEST_MODULES = [

src/databricks/sql/backend/filters.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
TYPE_CHECKING,
1818
)
1919

20-
from databricks.sql.utils import JsonQueue, SeaResultSetQueueFactory
2120
from databricks.sql.backend.types import ExecuteResponse, CommandId
2221
from databricks.sql.backend.sea.models.base import ResultData
2322
from databricks.sql.backend.sea.backend import SeaDatabricksClient

src/databricks/sql/backend/sea/backend.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import logging
2-
import uuid
32
import time
43
import re
5-
from typing import Dict, Tuple, List, Optional, Any, Union, TYPE_CHECKING, Set
4+
from typing import Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set
65

76
from databricks.sql.backend.sea.utils.constants import (
87
ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP,
@@ -23,9 +22,7 @@
2322
)
2423
from databricks.sql.exc import Error, NotSupportedError, ServerOperationError
2524
from databricks.sql.backend.sea.utils.http_client import SeaHttpClient
26-
from databricks.sql.thrift_api.TCLIService import ttypes
2725
from databricks.sql.types import SSLOptions
28-
from databricks.sql.utils import SeaResultSetQueueFactory
2926
from databricks.sql.backend.sea.models.base import (
3027
ResultData,
3128
ExternalLink,

src/databricks/sql/backend/thrift_backend.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141

4242
from databricks.sql.utils import (
43+
ThriftResultSetQueueFactory,
4344
_bound,
4445
RequestErrorInfo,
4546
NoRetryReason,

src/databricks/sql/result_set.py

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -474,39 +474,19 @@ def __init__(
474474
result_data: Result data from SEA response (optional)
475475
manifest: Manifest from SEA response (optional)
476476
"""
477-
# Extract and store SEA-specific properties
478-
self.statement_id = (
479-
execute_response.command_id.to_sea_statement_id()
480-
if execute_response.command_id
481-
else None
482-
)
483-
484-
# Build the results queue
485-
results_queue = None
486477

487478
if result_data:
488-
from typing import cast, List
489-
490-
# Convert description to the expected format
491-
desc = None
492-
if execute_response.description:
493-
desc = cast(List[Tuple[Any, ...]], execute_response.description)
494-
495-
results_queue = SeaResultSetQueueFactory.build_queue(
496-
result_data,
497-
manifest,
498-
str(self.statement_id),
499-
description=desc,
500-
schema_bytes=execute_response.arrow_schema_bytes
501-
if execute_response.arrow_schema_bytes
502-
else None,
503-
max_download_threads=sea_client.max_download_threads,
504-
ssl_options=sea_client.ssl_options,
505-
sea_client=sea_client,
506-
lz4_compressed=execute_response.lz4_compressed,
479+
queue = SeaResultSetQueueFactory.build_queue(
480+
sea_result_data=result_data,
481+
manifest=manifest,
482+
statement_id=execute_response.command_id.to_sea_statement_id(),
483+
description=execute_response.description,
484+
schema_bytes=execute_response.arrow_schema_bytes,
507485
)
486+
else:
487+
logger.warning("No result data provided for SEA result set")
488+
queue = JsonQueue([])
508489

509-
# Call parent constructor with common attributes
510490
super().__init__(
511491
connection=connection,
512492
backend=sea_client,
@@ -515,15 +495,13 @@ def __init__(
515495
command_id=execute_response.command_id,
516496
status=execute_response.status,
517497
has_been_closed_server_side=execute_response.has_been_closed_server_side,
498+
results_queue=queue,
518499
description=execute_response.description,
519500
is_staging_operation=execute_response.is_staging_operation,
520501
lz4_compressed=execute_response.lz4_compressed,
521-
arrow_schema_bytes=execute_response.arrow_schema_bytes or b"",
502+
arrow_schema_bytes=execute_response.arrow_schema_bytes,
522503
)
523504

524-
# Initialize queue for result data if not provided
525-
self.results = results_queue or JsonQueue([])
526-
527505
def _convert_to_row_objects(self, rows):
528506
"""
529507
Convert raw data rows to Row objects with named columns based on description.

src/databricks/sql/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
import lz4.frame
1818

19+
from databricks.sql.backend.sea.backend import SeaDatabricksClient
20+
from databricks.sql.backend.sea.models.base import ResultData, ResultManifest
21+
1922
try:
2023
import pyarrow
2124
except ImportError:

0 commit comments

Comments
 (0)