Skip to content

Commit 23dba80

Browse files
account for empty response in LinkFetcher init
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent f2ea729 commit 23dba80

File tree

2 files changed

+22
-26
lines changed

2 files changed

+22
-26
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -263,53 +263,50 @@ def __init__(
263263
description=description,
264264
)
265265

266-
self._sea_client = sea_client
267-
self._statement_id = statement_id
268-
self._total_chunk_count = total_chunk_count
269-
270266
logger.debug(
271267
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
272268
statement_id, total_chunk_count
273269
)
274270
)
275271

276272
initial_links = result_data.external_links or []
277-
self._chunk_index_to_link = {link.chunk_index: link for link in initial_links}
278273

279274
# Track the current chunk we're processing
280275
self._current_chunk_index = 0
281-
first_link = self._chunk_index_to_link.get(self._current_chunk_index, None)
282-
if not first_link:
283-
# possibly an empty response
284-
return
285-
286-
self.current_chunk_index = 0
287-
288-
self.link_fetcher = LinkFetcher(
289-
download_manager=self.download_manager,
290-
backend=self._sea_client,
291-
statement_id=self._statement_id,
292-
initial_links=initial_links,
293-
total_chunk_count=total_chunk_count,
294-
)
295-
self.link_fetcher.start()
276+
277+
if total_chunk_count < 1:
278+
# an empty response
279+
self.link_fetcher = None
280+
else:
281+
self.link_fetcher = LinkFetcher(
282+
download_manager=self.download_manager,
283+
backend=sea_client,
284+
statement_id=statement_id,
285+
initial_links=initial_links,
286+
total_chunk_count=total_chunk_count,
287+
)
288+
self.link_fetcher.start()
296289

297290
# Initialize table and position
298291
self.table = self._create_next_table()
299292

300293
def _create_next_table(self) -> Union["pyarrow.Table", None]:
301294
"""Create next table by retrieving the logical next downloaded file."""
302-
chunk_link = self.link_fetcher.get_chunk_link(self.current_chunk_index)
295+
if self.link_fetcher is None:
296+
return None
297+
298+
chunk_link = self.link_fetcher.get_chunk_link(self._current_chunk_index)
303299
if not chunk_link:
304300
return None
305301

306302
row_offset = chunk_link.row_offset
307303
arrow_table = self._create_table_at_offset(row_offset)
308304

309-
self.current_chunk_index += 1
305+
self._current_chunk_index += 1
310306

311307
return arrow_table
312308

313309
def close(self):
314310
super().close()
315-
self.link_fetcher.stop()
311+
if self.link_fetcher:
312+
self.link_fetcher.stop()

tests/unit/test_sea_queue.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,6 @@ def test_init_with_valid_initial_link(
359359
)
360360

361361
# Verify attributes
362-
assert queue._statement_id == "test-statement-123"
363362
assert queue._current_chunk_index == 0
364363

365364
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
@@ -391,7 +390,7 @@ def test_create_next_table_success(self, mock_logger):
391390
"""Test _create_next_table with successful table creation."""
392391
# Create a queue instance without initializing
393392
queue = Mock(spec=SeaCloudFetchQueue)
394-
queue.current_chunk_index = 0
393+
queue._current_chunk_index = 0
395394
queue.download_manager = Mock()
396395
queue.link_fetcher = Mock()
397396

@@ -405,7 +404,7 @@ def test_create_next_table_success(self, mock_logger):
405404
SeaCloudFetchQueue._create_next_table(queue)
406405

407406
# Verify the chunk index was incremented
408-
assert queue.current_chunk_index == 1
407+
assert queue._current_chunk_index == 1
409408

410409
# Verify the chunk link was retrieved
411410
queue.link_fetcher.get_chunk_link.assert_called_once_with(0)

0 commit comments

Comments
 (0)