Skip to content

Commit 777f7c1

Browse files
move download_manager init into parent CloudFetchQueue
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 0b1eba5 commit 777f7c1

File tree

3 files changed

+18
-50
lines changed

3 files changed

+18
-50
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
ResultManifest,
2020
)
2121
from databricks.sql.backend.sea.utils.constants import ResultFormat
22-
from databricks.sql.exc import ProgrammingError
22+
from databricks.sql.exc import ProgrammingError, ServerOperationError
2323
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
2424
from databricks.sql.types import SSLOptions
2525
from databricks.sql.utils import CloudFetchQueue, ResultSetQueue
@@ -149,13 +149,6 @@ def __init__(
149149
if not first_link:
150150
return
151151

152-
self.download_manager = ResultFileDownloadManager(
153-
links=[],
154-
max_download_threads=max_download_threads,
155-
lz4_compressed=lz4_compressed,
156-
ssl_options=ssl_options,
157-
)
158-
159152
# Track the current chunk we're processing
160153
self._current_chunk_link = first_link
161154

@@ -191,13 +184,13 @@ def _progress_chunk_link(self):
191184
self._statement_id, next_chunk_index
192185
)
193186
except Exception as e:
194-
logger.error(
195-
"SeaCloudFetchQueue: Error fetching link for chunk {}: {}".format(
196-
next_chunk_index, e
197-
)
187+
raise ServerOperationError(
188+
f"Error fetching link for chunk {next_chunk_index}: {e}",
189+
{
190+
"operation-id": self._statement_id,
191+
"diagnostic-info": None,
192+
},
198193
)
199-
self._current_chunk_link = None
200-
return None
201194

202195
logger.debug(
203196
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
@@ -208,10 +201,6 @@ def _create_table_from_link(
208201
) -> Union["pyarrow.Table", None]:
209202
"""Create a table from a link."""
210203

211-
if not self.download_manager:
212-
logger.debug("SeaCloudFetchQueue: No download manager, returning")
213-
return None
214-
215204
thrift_link = self._convert_to_thrift_link(link)
216205
self.download_manager.add_link(thrift_link)
217206

src/databricks/sql/utils.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,12 @@ def __init__(
230230
self.table_row_index = 0
231231

232232
# Initialize download manager
233-
self.download_manager: Optional["ResultFileDownloadManager"] = None
233+
self.download_manager = ResultFileDownloadManager(
234+
links=[],
235+
max_download_threads=max_download_threads,
236+
lz4_compressed=lz4_compressed,
237+
ssl_options=ssl_options,
238+
)
234239

235240
def next_n_rows(self, num_rows: int) -> "pyarrow.Table":
236241
"""
@@ -287,11 +292,8 @@ def remaining_rows(self) -> "pyarrow.Table":
287292

288293
def _create_table_at_offset(self, offset: int) -> Union["pyarrow.Table", None]:
289294
"""Create next table at the given row offset"""
290-
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
291-
if not self.download_manager:
292-
logger.debug("CloudFetchQueue: No download manager available")
293-
return None
294295

296+
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
295297
downloaded_file = self.download_manager.get_next_downloaded_file(offset)
296298
if not downloaded_file:
297299
logger.debug(
@@ -373,14 +375,7 @@ def __init__(
373375
result_link.startRowOffset, result_link.rowCount
374376
)
375377
)
376-
377-
# Initialize download manager
378-
self.download_manager = ResultFileDownloadManager(
379-
links=self.result_links,
380-
max_download_threads=self.max_download_threads,
381-
lz4_compressed=self.lz4_compressed,
382-
ssl_options=self._ssl_options,
383-
)
378+
self.download_manager.add_link(result_link)
384379

385380
# Initialize table and position
386381
self.table = self._create_next_table()

tests/unit/test_sea_queue.py

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
ExternalLink,
1919
)
2020
from databricks.sql.backend.sea.utils.constants import ResultFormat
21-
from databricks.sql.exc import ProgrammingError
21+
from databricks.sql.exc import ProgrammingError, ServerOperationError
2222
from databricks.sql.types import SSLOptions
2323

2424

@@ -340,9 +340,6 @@ def test_init_with_valid_initial_link(
340340
sample_external_link,
341341
):
342342
"""Test initialization with valid initial link."""
343-
mock_download_manager = Mock()
344-
mock_download_manager_class.return_value = mock_download_manager
345-
346343
# Create a queue with valid initial link
347344
with patch.object(
348345
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
@@ -365,13 +362,9 @@ def test_init_with_valid_initial_link(
365362
)
366363
)
367364

368-
# Verify download manager was created
369-
mock_download_manager_class.assert_called_once()
370-
371365
# Verify attributes
372366
assert queue._statement_id == "test-statement-123"
373367
assert queue._current_chunk_link == sample_external_link
374-
assert queue.download_manager == mock_download_manager
375368

376369
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
377370
@patch("databricks.sql.backend.sea.queue.logger")
@@ -514,21 +507,12 @@ def test_progress_chunk_link_error(self, mock_logger, mock_sea_client):
514507
mock_sea_client.get_chunk_link.side_effect = Exception(error_message)
515508

516509
# Call the method directly
517-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
510+
with pytest.raises(ServerOperationError, match=error_message):
511+
SeaCloudFetchQueue._progress_chunk_link(queue)
518512

519513
# Verify the client was called
520514
mock_sea_client.get_chunk_link.assert_called_once_with("test-statement-123", 1)
521515

522-
# Verify error message was logged
523-
mock_logger.error.assert_called_with(
524-
"SeaCloudFetchQueue: Error fetching link for chunk {}: {}".format(
525-
1, error_message
526-
)
527-
)
528-
529-
# Verify the result is None
530-
assert result is None
531-
532516
@patch("databricks.sql.backend.sea.queue.logger")
533517
def test_create_next_table_no_current_link(self, mock_logger):
534518
"""Test _create_next_table with no current link."""

0 commit comments

Comments
 (0)