Skip to content

Commit e74ccd1

Browse files
hybrid disposition
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 4a9ba21 commit e74ccd1

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ def __init__(
125125

126126
super().__init__(ssl_options=ssl_options, **kwargs)
127127

128-
self.use_hybrid_disposition = kwargs.get("use_hybrid_disposition", False)
128+
self.use_hybrid_disposition = kwargs.get("use_hybrid_disposition", True)
129+
logger.info(f"use_hybrid_disposition: {self.use_hybrid_disposition}")
129130

130131
# Extract warehouse ID from http_path
131132
self.warehouse_id = self._extract_warehouse_id(http_path)

src/databricks/sql/backend/sea/models/responses.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
These models define the structures used in SEA API responses.
55
"""
66

7+
import base64
78
from typing import Dict, Any, List
89
from dataclasses import dataclass
910

@@ -91,6 +92,12 @@ def _parse_result(data: Dict[str, Any]) -> ResultData:
9192
)
9293
)
9394

95+
# Handle attachment field - decode from base64 if present
96+
attachment = result_data.get("attachment")
97+
if attachment is not None and isinstance(attachment, str):
98+
# Decode base64 string to bytes
99+
attachment = base64.b64decode(attachment)
100+
94101
return ResultData(
95102
data=result_data.get("data_array"),
96103
external_links=external_links,
@@ -100,7 +107,7 @@ def _parse_result(data: Dict[str, Any]) -> ResultData:
100107
next_chunk_internal_link=result_data.get("next_chunk_internal_link"),
101108
row_count=result_data.get("row_count"),
102109
row_offset=result_data.get("row_offset"),
103-
attachment=result_data.get("attachment"),
110+
attachment=attachment,
104111
)
105112

106113

src/databricks/sql/backend/sea/queue.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
from databricks.sql.cloudfetch.download_manager import ResultFileDownloadManager
77

8+
import lz4.frame
9+
810
try:
911
import pyarrow
1012
except ImportError:
@@ -22,7 +24,7 @@
2224
from databricks.sql.exc import ProgrammingError
2325
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
2426
from databricks.sql.types import SSLOptions
25-
from databricks.sql.utils import CloudFetchQueue, ResultSetQueue
27+
from databricks.sql.utils import ArrowQueue, CloudFetchQueue, ResultSetQueue, create_arrow_table_from_arrow_file
2628

2729
import logging
2830

@@ -61,6 +63,15 @@ def build_queue(
6163
# INLINE disposition with JSON_ARRAY format
6264
return JsonQueue(result_data.data)
6365
elif manifest.format == ResultFormat.ARROW_STREAM.value:
66+
if result_data.attachment is not None:
67+
arrow_file = (
68+
lz4.frame.decompress(result_data.attachment)
69+
if lz4_compressed
70+
else result_data.attachment
71+
)
72+
arrow_table = create_arrow_table_from_arrow_file(arrow_file, description)
73+
return ArrowQueue(arrow_table, manifest.total_row_count)
74+
6475
# EXTERNAL_LINKS disposition
6576
return SeaCloudFetchQueue(
6677
initial_links=result_data.external_links or [],
@@ -144,7 +155,9 @@ def __init__(
144155
)
145156
)
146157

147-
initial_link = next((l for l in initial_links if l.chunk_index == 0), None)
158+
self._chunk_index_to_link = {link.chunk_index: link for link in initial_links}
159+
160+
initial_link = self._chunk_index_to_link.get(0, None)
148161
if not initial_link:
149162
return
150163

@@ -174,6 +187,12 @@ def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink
174187
httpHeaders=link.http_headers or {},
175188
)
176189

190+
def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
191+
if chunk_index not in self._chunk_index_to_link:
192+
links = self._sea_client.get_chunk_links(self._statement_id, chunk_index)
193+
self._chunk_index_to_link.update({link.chunk_index: link for link in links})
194+
return self._chunk_index_to_link.get(chunk_index, None)
195+
177196
def _progress_chunk_link(self):
178197
"""Progress to the next chunk link."""
179198
if not self._current_chunk_link:
@@ -185,17 +204,11 @@ def _progress_chunk_link(self):
185204
self._current_chunk_link = None
186205
return None
187206

188-
try:
189-
self._current_chunk_link = self._sea_client.get_chunk_link(
190-
self._statement_id, next_chunk_index
191-
)
192-
except Exception as e:
207+
self._current_chunk_link = self._get_chunk_link(next_chunk_index)
208+
if not self._current_chunk_link:
193209
logger.error(
194-
"SeaCloudFetchQueue: Error fetching link for chunk {}: {}".format(
195-
next_chunk_index, e
196-
)
210+
"SeaCloudFetchQueue: unable to retrieve link for chunk {}".format(next_chunk_index)
197211
)
198-
self._current_chunk_link = None
199212
return None
200213

201214
logger.debug(

0 commit comments

Comments
 (0)