Skip to content

Commit 671dbca

Browse files
Merge branch 'ext-links-sea' into sea-hybrid
2 parents 239cd8d + fce324b commit 671dbca

File tree

3 files changed

+55
-19
lines changed

3 files changed

+55
-19
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,10 @@ def __init__(
177177
)
178178

179179
# Track the current chunk we're processing
180-
self._current_chunk_link: Optional["ExternalLink"] = initial_link
180+
self._current_chunk_link = initial_link
181181

182182
# Initialize table and position
183-
self.table = self._create_next_table()
183+
self.table = self._create_table_from_link(self._current_chunk_link)
184184

185185
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
186186
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
@@ -225,22 +225,30 @@ def _progress_chunk_link(self):
225225
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
226226
)
227227

228-
def _create_next_table(self) -> Union["pyarrow.Table", None]:
229-
"""Create next table by retrieving the logical next downloaded file."""
230-
if not self._current_chunk_link:
231-
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
232-
return None
228+
def _create_table_from_link(
229+
self, link: "ExternalLink"
230+
) -> Union["pyarrow.Table", None]:
231+
"""Create a table from a link."""
233232

234233
if not self.download_manager:
235234
logger.debug("SeaCloudFetchQueue: No download manager, returning")
236235
return None
237236

238-
thrift_link = self._convert_to_thrift_link(self._current_chunk_link)
237+
thrift_link = self._convert_to_thrift_link(link)
239238
self.download_manager.add_link(thrift_link)
240239

241-
row_offset = self._current_chunk_link.row_offset
240+
row_offset = link.row_offset
242241
arrow_table = self._create_table_at_offset(row_offset)
243242

243+
return arrow_table
244+
245+
def _create_next_table(self) -> Union["pyarrow.Table", None]:
246+
"""Create next table by retrieving the logical next downloaded file."""
247+
244248
self._progress_chunk_link()
245249

246-
return arrow_table
250+
if not self._current_chunk_link:
251+
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
252+
return None
253+
254+
return self._create_table_from_link(self._current_chunk_link)

tests/e2e/common/large_queries_mixin.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import math
33
import time
44

5+
import pytest
6+
57
log = logging.getLogger(__name__)
68

79

@@ -42,7 +44,14 @@ def fetch_rows(self, cursor, row_count, fetchmany_size):
4244
+ "assuming 10K fetch size."
4345
)
4446

45-
def test_query_with_large_wide_result_set(self):
47+
@pytest.mark.parametrize(
48+
"extra_params",
49+
[
50+
{},
51+
{"use_sea": True},
52+
],
53+
)
54+
def test_query_with_large_wide_result_set(self, extra_params):
4655
resultSize = 300 * 1000 * 1000 # 300 MB
4756
width = 8192 # B
4857
rows = resultSize // width
@@ -52,7 +61,7 @@ def test_query_with_large_wide_result_set(self):
5261
fetchmany_size = 10 * 1024 * 1024 // width
5362
# This is used by PyHive tests to determine the buffer size
5463
self.arraysize = 1000
55-
with self.cursor() as cursor:
64+
with self.cursor(extra_params) as cursor:
5665
for lz4_compression in [False, True]:
5766
cursor.connection.lz4_compression = lz4_compression
5867
uuids = ", ".join(["uuid() uuid{}".format(i) for i in range(cols)])
@@ -68,7 +77,14 @@ def test_query_with_large_wide_result_set(self):
6877
assert row[0] == row_id # Verify no rows are dropped in the middle.
6978
assert len(row[1]) == 36
7079

71-
def test_query_with_large_narrow_result_set(self):
80+
@pytest.mark.parametrize(
81+
"extra_params",
82+
[
83+
{},
84+
{"use_sea": True},
85+
],
86+
)
87+
def test_query_with_large_narrow_result_set(self, extra_params):
7288
resultSize = 300 * 1000 * 1000 # 300 MB
7389
width = 8 # sizeof(long)
7490
rows = resultSize / width
@@ -77,12 +93,19 @@ def test_query_with_large_narrow_result_set(self):
7793
fetchmany_size = 10 * 1024 * 1024 // width
7894
# This is used by PyHive tests to determine the buffer size
7995
self.arraysize = 10000000
80-
with self.cursor() as cursor:
96+
with self.cursor(extra_params) as cursor:
8197
cursor.execute("SELECT * FROM RANGE({rows})".format(rows=rows))
8298
for row_id, row in enumerate(self.fetch_rows(cursor, rows, fetchmany_size)):
8399
assert row[0] == row_id
84100

85-
def test_long_running_query(self):
101+
@pytest.mark.parametrize(
102+
"extra_params",
103+
[
104+
{},
105+
{"use_sea": True},
106+
],
107+
)
108+
def test_long_running_query(self, extra_params):
86109
"""Incrementally increase query size until it takes at least 3 minutes,
87110
and asserts that the query completes successfully.
88111
"""
@@ -92,7 +115,7 @@ def test_long_running_query(self):
92115
duration = -1
93116
scale0 = 10000
94117
scale_factor = 1
95-
with self.cursor() as cursor:
118+
with self.cursor(extra_params) as cursor:
96119
while duration < min_duration:
97120
assert scale_factor < 1024, "Detected infinite loop"
98121
start = time.time()

tests/unit/test_sea_queue.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ def test_build_queue_arrow_stream(
213213

214214
with patch(
215215
"databricks.sql.backend.sea.queue.ResultFileDownloadManager"
216-
), patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
216+
), patch.object(
217+
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
218+
):
217219
queue = SeaResultSetQueueFactory.build_queue(
218220
result_data=result_data,
219221
manifest=arrow_manifest,
@@ -342,7 +344,9 @@ def test_init_with_valid_initial_link(
342344
mock_download_manager_class.return_value = mock_download_manager
343345

344346
# Create a queue with valid initial link
345-
with patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
347+
with patch.object(
348+
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
349+
):
346350
queue = SeaCloudFetchQueue(
347351
initial_links=[sample_external_link],
348352
max_download_threads=5,
@@ -528,13 +532,14 @@ def test_create_next_table_success(self, mock_logger):
528532
# Mock the dependencies
529533
mock_table = Mock()
530534
queue._create_table_at_offset = Mock(return_value=mock_table)
535+
queue._create_table_from_link = Mock(return_value=mock_table)
531536
queue._progress_chunk_link = Mock()
532537

533538
# Call the method directly
534539
result = SeaCloudFetchQueue._create_next_table(queue)
535540

536541
# Verify the table was created
537-
queue._create_table_at_offset.assert_called_once_with(50)
542+
queue._create_table_from_link.assert_called_once_with(queue._current_chunk_link)
538543

539544
# Verify progress was called
540545
queue._progress_chunk_link.assert_called_once()

0 commit comments

Comments
 (0)