Skip to content

Commit fce324b

Browse files
move link fetching immediately before table creation so link expiry is not an issue
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent abef941 commit fce324b

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ def __init__(
156156
)
157157

158158
# Track the current chunk we're processing
159-
self._current_chunk_link: Optional["ExternalLink"] = initial_link
159+
self._current_chunk_link = initial_link
160160

161161
# Initialize table and position
162-
self.table = self._create_next_table()
162+
self.table = self._create_table_from_link(self._current_chunk_link)
163163

164164
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
165165
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
@@ -202,22 +202,30 @@ def _progress_chunk_link(self):
202202
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
203203
)
204204

205-
def _create_next_table(self) -> Union["pyarrow.Table", None]:
206-
"""Create next table by retrieving the logical next downloaded file."""
207-
if not self._current_chunk_link:
208-
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
209-
return None
205+
def _create_table_from_link(
206+
self, link: "ExternalLink"
207+
) -> Union["pyarrow.Table", None]:
208+
"""Create a table from a link."""
210209

211210
if not self.download_manager:
212211
logger.debug("SeaCloudFetchQueue: No download manager, returning")
213212
return None
214213

215-
thrift_link = self._convert_to_thrift_link(self._current_chunk_link)
214+
thrift_link = self._convert_to_thrift_link(link)
216215
self.download_manager.add_link(thrift_link)
217216

218-
row_offset = self._current_chunk_link.row_offset
217+
row_offset = link.row_offset
219218
arrow_table = self._create_table_at_offset(row_offset)
220219

220+
return arrow_table
221+
222+
def _create_next_table(self) -> Union["pyarrow.Table", None]:
223+
"""Create next table by retrieving the logical next downloaded file."""
224+
221225
self._progress_chunk_link()
222226

223-
return arrow_table
227+
if not self._current_chunk_link:
228+
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
229+
return None
230+
231+
return self._create_table_from_link(self._current_chunk_link)

tests/unit/test_sea_queue.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ def test_build_queue_arrow_stream(
213213

214214
with patch(
215215
"databricks.sql.backend.sea.queue.ResultFileDownloadManager"
216-
), patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
216+
), patch.object(
217+
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
218+
):
217219
queue = SeaResultSetQueueFactory.build_queue(
218220
result_data=result_data,
219221
manifest=arrow_manifest,
@@ -342,7 +344,9 @@ def test_init_with_valid_initial_link(
342344
mock_download_manager_class.return_value = mock_download_manager
343345

344346
# Create a queue with valid initial link
345-
with patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
347+
with patch.object(
348+
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
349+
):
346350
queue = SeaCloudFetchQueue(
347351
initial_links=[sample_external_link],
348352
max_download_threads=5,
@@ -608,13 +612,14 @@ def test_create_next_table_success(self, mock_logger):
608612
# Mock the dependencies
609613
mock_table = Mock()
610614
queue._create_table_at_offset = Mock(return_value=mock_table)
615+
queue._create_table_from_link = Mock(return_value=mock_table)
611616
queue._progress_chunk_link = Mock()
612617

613618
# Call the method directly
614619
result = SeaCloudFetchQueue._create_next_table(queue)
615620

616621
# Verify the table was created
617-
queue._create_table_at_offset.assert_called_once_with(50)
622+
queue._create_table_from_link.assert_called_once_with(queue._current_chunk_link)
618623

619624
# Verify progress was called
620625
queue._progress_chunk_link.assert_called_once()

0 commit comments

Comments
 (0)