Skip to content

Commit 808e31b

Browse files
Merge branch 'ext-links-sea' into sea-hybrid
2 parents 2701e5d + cc514ad commit 808e31b

File tree

2 files changed

+26
-89
lines changed

2 files changed

+26
-89
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from abc import ABC
4-
from typing import List, Optional, Tuple, Union
4+
from typing import List, Optional, Tuple, Union, TYPE_CHECKING
55

66
from databricks.sql.cloudfetch.download_manager import ResultFileDownloadManager
77

@@ -14,12 +14,13 @@
1414

1515
import dateutil
1616

17-
from databricks.sql.backend.sea.backend import SeaDatabricksClient
18-
from databricks.sql.backend.sea.models.base import (
19-
ExternalLink,
20-
ResultData,
21-
ResultManifest,
22-
)
17+
if TYPE_CHECKING:
18+
from databricks.sql.backend.sea.backend import SeaDatabricksClient
19+
from databricks.sql.backend.sea.models.base import (
20+
ExternalLink,
21+
ResultData,
22+
ResultManifest,
23+
)
2324
from databricks.sql.backend.sea.utils.constants import ResultFormat
2425
from databricks.sql.exc import ProgrammingError, ServerOperationError
2526
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
@@ -128,7 +129,7 @@ def __init__(
128129
result_data: ResultData,
129130
max_download_threads: int,
130131
ssl_options: SSLOptions,
131-
sea_client: "SeaDatabricksClient",
132+
sea_client: SeaDatabricksClient,
132133
statement_id: str,
133134
total_chunk_count: int,
134135
lz4_compressed: bool = False,
@@ -160,6 +161,7 @@ def __init__(
160161
self._sea_client = sea_client
161162
self._statement_id = statement_id
162163
self._total_chunk_count = total_chunk_count
164+
self._total_chunk_count = total_chunk_count
163165

164166
logger.debug(
165167
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
@@ -175,19 +177,14 @@ def __init__(
175177
first_link = self._chunk_index_to_link.get(self._current_chunk_index, None)
176178
if not first_link:
177179
# possibly an empty response
178-
return
179-
180-
self.download_manager = ResultFileDownloadManager(
181-
links=[],
182-
max_download_threads=max_download_threads,
183-
lz4_compressed=lz4_compressed,
184-
ssl_options=ssl_options,
185-
)
180+
return None
186181

182+
# Track the current chunk we're processing
183+
self._current_chunk_index = 0
187184
# Initialize table and position
188185
self.table = self._create_table_from_link(first_link)
189186

190-
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
187+
def _convert_to_thrift_link(self, link: ExternalLink) -> TSparkArrowResultLink:
191188
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
192189
# Parse the ISO format expiration time
193190
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
@@ -220,7 +217,7 @@ def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
220217
return link
221218

222219
def _create_table_from_link(
223-
self, link: "ExternalLink"
220+
self, link: ExternalLink
224221
) -> Union["pyarrow.Table", None]:
225222
"""Create a table from a link."""
226223

tests/unit/test_sea_queue.py

Lines changed: 11 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ def test_init_with_valid_initial_link(
364364

365365
# Verify attributes
366366
assert queue._statement_id == "test-statement-123"
367-
assert queue._current_chunk_link == sample_external_link
367+
assert queue._current_chunk_index == 0
368368

369369
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
370370
@patch("databricks.sql.backend.sea.queue.logger")
@@ -390,91 +390,31 @@ def test_init_no_initial_links(
390390
)
391391
assert queue.table is None
392392

393-
@patch("databricks.sql.backend.sea.queue.logger")
394-
def test_progress_chunk_link_no_current_link(self, mock_logger):
395-
"""Test _progress_chunk_link with no current link."""
396-
# Create a queue instance without initializing
397-
queue = Mock(spec=SeaCloudFetchQueue)
398-
queue._current_chunk_link = None
399-
400-
# Call the method directly
401-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
402-
403-
# Verify the result is None
404-
assert result is None
405-
406-
@patch("databricks.sql.backend.sea.queue.logger")
407-
def test_progress_chunk_link_no_next_chunk(self, mock_logger):
408-
"""Test _progress_chunk_link with no next chunk index."""
409-
# Create a queue instance without initializing
410-
queue = Mock(spec=SeaCloudFetchQueue)
411-
queue._current_chunk_link = ExternalLink(
412-
external_link="https://example.com/data/chunk0",
413-
expiration="2025-07-03T05:51:18.118009",
414-
row_count=100,
415-
byte_count=1024,
416-
row_offset=0,
417-
chunk_index=0,
418-
next_chunk_index=None,
419-
http_headers={"Authorization": "Bearer token123"},
420-
)
421-
422-
# Call the method directly
423-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
424-
425-
# Verify the result is None
426-
assert result is None
427-
assert queue._current_chunk_link is None
428-
429-
@patch("databricks.sql.backend.sea.queue.logger")
430-
def test_create_next_table_no_current_link(self, mock_logger):
431-
"""Test _create_next_table with no current link."""
432-
# Create a queue instance without initializing
433-
queue = Mock(spec=SeaCloudFetchQueue)
434-
queue._current_chunk_link = None
435-
436-
# Call the method directly
437-
result = SeaCloudFetchQueue._create_next_table(queue)
438-
439-
# Verify debug message was logged
440-
mock_logger.debug.assert_called_with(
441-
"SeaCloudFetchQueue: No current chunk link, returning"
442-
)
443-
444-
# Verify the result is None
445-
assert result is None
446-
447393
@patch("databricks.sql.backend.sea.queue.logger")
448394
def test_create_next_table_success(self, mock_logger):
449395
"""Test _create_next_table with successful table creation."""
450396
# Create a queue instance without initializing
451397
queue = Mock(spec=SeaCloudFetchQueue)
452-
queue._current_chunk_link = ExternalLink(
453-
external_link="https://example.com/data/chunk0",
454-
expiration="2025-07-03T05:51:18.118009",
455-
row_count=100,
456-
byte_count=1024,
457-
row_offset=50,
458-
chunk_index=0,
459-
next_chunk_index=1,
460-
http_headers={"Authorization": "Bearer token123"},
461-
)
398+
queue._current_chunk_index = 0
462399
queue.download_manager = Mock()
463400

464401
# Mock the dependencies
465402
mock_table = Mock()
466-
queue._create_table_at_offset = Mock(return_value=mock_table)
403+
mock_chunk_link = Mock()
404+
queue._get_chunk_link = Mock(return_value=mock_chunk_link)
467405
queue._create_table_from_link = Mock(return_value=mock_table)
468-
queue._progress_chunk_link = Mock()
469406

470407
# Call the method directly
471408
result = SeaCloudFetchQueue._create_next_table(queue)
472409

473-
# Verify the table was created
474-
queue._create_table_from_link.assert_called_once_with(queue._current_chunk_link)
410+
# Verify the chunk index was incremented
411+
assert queue._current_chunk_index == 1
412+
413+
# Verify the chunk link was retrieved
414+
queue._get_chunk_link.assert_called_once_with(1)
475415

476-
# Verify progress was called
477-
queue._progress_chunk_link.assert_called_once()
416+
# Verify the table was created from the link
417+
queue._create_table_from_link.assert_called_once_with(mock_chunk_link)
478418

479419
# Verify the result is the table
480420
assert result == mock_table

0 commit comments

Comments
 (0)