Skip to content

Commit f56497f

Browse files
make LinkFetcher convert link static
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent eb24409 commit f56497f

File tree

2 files changed

+21
-28
lines changed

2 files changed

+21
-28
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from __future__ import annotations
22

33
from abc import ABC
4-
from typing import List, Optional, Tuple, Union, TYPE_CHECKING
4+
import threading
5+
from typing import Dict, List, Optional, Tuple, Union, TYPE_CHECKING
56

67
from databricks.sql.cloudfetch.download_manager import ResultFileDownloadManager
78

@@ -142,7 +143,7 @@ def __init__(
142143

143144
for link in initial_links:
144145
self.chunk_index_to_link[link.chunk_index] = link
145-
self.download_manager.add_link(self._convert_to_thrift_link(link))
146+
self.download_manager.add_link(LinkFetcher._convert_to_thrift_link(link))
146147
self.total_chunk_count = total_chunk_count
147148

148149
def _get_next_chunk_index(self) -> Optional[int]:
@@ -188,7 +189,8 @@ def get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
188189

189190
return self.chunk_index_to_link.get(chunk_index, None)
190191

191-
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
192+
@staticmethod
193+
def _convert_to_thrift_link(link: "ExternalLink") -> TSparkArrowResultLink:
192194
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
193195
# Parse the ISO format expiration time
194196
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())

tests/unit/test_sea_queue.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from databricks.sql.backend.sea.queue import (
1313
JsonQueue,
14+
LinkFetcher,
1415
SeaResultSetQueueFactory,
1516
SeaCloudFetchQueue,
1617
)
@@ -216,9 +217,7 @@ def test_build_queue_arrow_stream(
216217

217218
with patch(
218219
"databricks.sql.backend.sea.queue.ResultFileDownloadManager"
219-
), patch.object(
220-
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
221-
):
220+
), patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
222221
queue = SeaResultSetQueueFactory.build_queue(
223222
result_data=result_data,
224223
manifest=arrow_manifest,
@@ -303,10 +302,8 @@ def sample_external_link_no_headers(self):
303302

304303
def test_convert_to_thrift_link(self, sample_external_link):
305304
"""Test conversion of ExternalLink to TSparkArrowResultLink."""
306-
queue = Mock(spec=SeaCloudFetchQueue)
307-
308305
# Call the method directly
309-
result = SeaCloudFetchQueue._convert_to_thrift_link(queue, sample_external_link)
306+
result = LinkFetcher._convert_to_thrift_link(sample_external_link)
310307

311308
# Verify the conversion
312309
assert result.fileLink == sample_external_link.external_link
@@ -317,12 +314,8 @@ def test_convert_to_thrift_link(self, sample_external_link):
317314

318315
def test_convert_to_thrift_link_no_headers(self, sample_external_link_no_headers):
319316
"""Test conversion of ExternalLink with no headers to TSparkArrowResultLink."""
320-
queue = Mock(spec=SeaCloudFetchQueue)
321-
322317
# Call the method directly
323-
result = SeaCloudFetchQueue._convert_to_thrift_link(
324-
queue, sample_external_link_no_headers
325-
)
318+
result = LinkFetcher._convert_to_thrift_link(sample_external_link_no_headers)
326319

327320
# Verify the conversion
328321
assert result.fileLink == sample_external_link_no_headers.external_link
@@ -344,9 +337,7 @@ def test_init_with_valid_initial_link(
344337
):
345338
"""Test initialization with valid initial link."""
346339
# Create a queue with valid initial link
347-
with patch.object(
348-
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
349-
):
340+
with patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
350341
queue = SeaCloudFetchQueue(
351342
result_data=ResultData(external_links=[sample_external_link]),
352343
max_download_threads=5,
@@ -398,29 +389,29 @@ def test_create_next_table_success(self, mock_logger):
398389
"""Test _create_next_table with successful table creation."""
399390
# Create a queue instance without initializing
400391
queue = Mock(spec=SeaCloudFetchQueue)
401-
queue._current_chunk_index = 0
392+
queue.current_chunk_index = 0
402393
queue.download_manager = Mock()
394+
queue.link_fetcher = Mock()
403395

404396
# Mock the dependencies
405397
mock_table = Mock()
406398
mock_chunk_link = Mock()
407-
queue._get_chunk_link = Mock(return_value=mock_chunk_link)
408-
queue._create_table_from_link = Mock(return_value=mock_table)
399+
queue.link_fetcher.get_chunk_link = Mock(return_value=mock_chunk_link)
400+
queue._create_table_at_offset = Mock(return_value=mock_table)
409401

410402
# Call the method directly
411-
result = SeaCloudFetchQueue._create_next_table(queue)
403+
SeaCloudFetchQueue._create_next_table(queue)
412404

413405
# Verify the chunk index was incremented
414-
assert queue._current_chunk_index == 1
406+
assert queue.current_chunk_index == 1
415407

416408
# Verify the chunk link was retrieved
417-
queue._get_chunk_link.assert_called_once_with(1)
409+
queue.link_fetcher.get_chunk_link.assert_called_once_with(0)
418410

419411
# Verify the table was created from the link
420-
queue._create_table_from_link.assert_called_once_with(mock_chunk_link)
421-
422-
# Verify the result is the table
423-
assert result == mock_table
412+
queue._create_table_at_offset.assert_called_once_with(
413+
mock_chunk_link.row_offset
414+
)
424415

425416

426417
class TestHybridDisposition:
@@ -494,7 +485,7 @@ def test_hybrid_disposition_with_attachment(
494485
mock_create_table.assert_called_once_with(attachment_data, description)
495486

496487
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
497-
@patch.object(SeaCloudFetchQueue, "_create_table_from_link", return_value=None)
488+
@patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None)
498489
def test_hybrid_disposition_with_external_links(
499490
self,
500491
mock_create_table,

0 commit comments

Comments
 (0)