Skip to content

Commit 7ce8745

Browse files
iterate over chunk indexes instead of link
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 28c6bb1 commit 7ce8745

File tree

2 files changed

+24
-164
lines changed

2 files changed

+24
-164
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def __init__(
140140

141141
self._sea_client = sea_client
142142
self._statement_id = statement_id
143+
self._total_chunk_count = total_chunk_count
143144

144145
logger.debug(
145146
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
@@ -154,10 +155,9 @@ def __init__(
154155
return None
155156

156157
# Track the current chunk we're processing
157-
self._current_chunk_link = first_link
158-
158+
self._current_chunk_index = 0
159159
# Initialize table and position
160-
self.table = self._create_table_from_link(self._current_chunk_link)
160+
self.table = self._create_table_from_link(first_link)
161161

162162
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
163163
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
@@ -172,34 +172,24 @@ def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink
172172
httpHeaders=link.http_headers or {},
173173
)
174174

175-
def _progress_chunk_link(self):
175+
def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
176176
"""Progress to the next chunk link."""
177-
if not self._current_chunk_link:
178-
return None
179-
180-
next_chunk_index = self._current_chunk_link.next_chunk_index
181-
182-
if next_chunk_index is None:
183-
self._current_chunk_link = None
177+
if chunk_index >= self._total_chunk_count:
184178
return None
185179

186180
try:
187-
self._current_chunk_link = self._sea_client.get_chunk_link(
188-
self._statement_id, next_chunk_index
181+
return self._sea_client.get_chunk_link(
182+
self._statement_id, chunk_index
189183
)
190184
except Exception as e:
191185
raise ServerOperationError(
192-
f"Error fetching link for chunk {next_chunk_index}: {e}",
186+
f"Error fetching link for chunk {chunk_index}: {e}",
193187
{
194188
"operation-id": self._statement_id,
195189
"diagnostic-info": None,
196190
},
197191
)
198192

199-
logger.debug(
200-
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
201-
)
202-
203193
def _create_table_from_link(
204194
self, link: "ExternalLink"
205195
) -> Union["pyarrow.Table", None]:
@@ -216,10 +206,11 @@ def _create_table_from_link(
216206
def _create_next_table(self) -> Union["pyarrow.Table", None]:
217207
"""Create next table by retrieving the logical next downloaded file."""
218208

219-
self._progress_chunk_link()
209+
print(self._current_chunk_index)
210+
self._current_chunk_index += 1
211+
next_chunk_link = self._get_chunk_link(self._current_chunk_index)
220212

221-
if not self._current_chunk_link:
213+
if not next_chunk_link:
222214
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
223215
return None
224-
225-
return self._create_table_from_link(self._current_chunk_link)
216+
return self._create_table_from_link(next_chunk_link)

tests/unit/test_sea_queue.py

Lines changed: 11 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ def test_init_with_valid_initial_link(
364364

365365
# Verify attributes
366366
assert queue._statement_id == "test-statement-123"
367-
assert queue._current_chunk_link == sample_external_link
367+
assert queue._current_chunk_index == 0
368368

369369
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
370370
@patch("databricks.sql.backend.sea.queue.logger")
@@ -390,162 +390,31 @@ def test_init_no_initial_links(
390390
)
391391
assert queue.table is None
392392

393-
@patch("databricks.sql.backend.sea.queue.logger")
394-
def test_progress_chunk_link_no_current_link(self, mock_logger):
395-
"""Test _progress_chunk_link with no current link."""
396-
# Create a queue instance without initializing
397-
queue = Mock(spec=SeaCloudFetchQueue)
398-
queue._current_chunk_link = None
399-
400-
# Call the method directly
401-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
402-
403-
# Verify the result is None
404-
assert result is None
405-
406-
@patch("databricks.sql.backend.sea.queue.logger")
407-
def test_progress_chunk_link_no_next_chunk(self, mock_logger):
408-
"""Test _progress_chunk_link with no next chunk index."""
409-
# Create a queue instance without initializing
410-
queue = Mock(spec=SeaCloudFetchQueue)
411-
queue._current_chunk_link = ExternalLink(
412-
external_link="https://example.com/data/chunk0",
413-
expiration="2025-07-03T05:51:18.118009",
414-
row_count=100,
415-
byte_count=1024,
416-
row_offset=0,
417-
chunk_index=0,
418-
next_chunk_index=None,
419-
http_headers={"Authorization": "Bearer token123"},
420-
)
421-
422-
# Call the method directly
423-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
424-
425-
# Verify the result is None
426-
assert result is None
427-
assert queue._current_chunk_link is None
428-
429-
@patch("databricks.sql.backend.sea.queue.logger")
430-
def test_progress_chunk_link_success(self, mock_logger, mock_sea_client):
431-
"""Test _progress_chunk_link with successful progression."""
432-
# Create a queue instance without initializing
433-
queue = Mock(spec=SeaCloudFetchQueue)
434-
queue._current_chunk_link = ExternalLink(
435-
external_link="https://example.com/data/chunk0",
436-
expiration="2025-07-03T05:51:18.118009",
437-
row_count=100,
438-
byte_count=1024,
439-
row_offset=0,
440-
chunk_index=0,
441-
next_chunk_index=1,
442-
http_headers={"Authorization": "Bearer token123"},
443-
)
444-
queue._sea_client = mock_sea_client
445-
queue._statement_id = "test-statement-123"
446-
447-
# Setup the mock client to return a new link
448-
next_link = ExternalLink(
449-
external_link="https://example.com/data/chunk1",
450-
expiration="2025-07-03T05:51:18.235843",
451-
row_count=50,
452-
byte_count=512,
453-
row_offset=100,
454-
chunk_index=1,
455-
next_chunk_index=None,
456-
http_headers={"Authorization": "Bearer token123"},
457-
)
458-
mock_sea_client.get_chunk_link.return_value = next_link
459-
460-
# Call the method directly
461-
SeaCloudFetchQueue._progress_chunk_link(queue)
462-
463-
# Verify the client was called
464-
mock_sea_client.get_chunk_link.assert_called_once_with("test-statement-123", 1)
465-
466-
# Verify debug message was logged
467-
mock_logger.debug.assert_called_with(
468-
f"SeaCloudFetchQueue: Progressed to link for chunk 1: {next_link}"
469-
)
470-
471-
@patch("databricks.sql.backend.sea.queue.logger")
472-
def test_progress_chunk_link_error(self, mock_logger, mock_sea_client):
473-
"""Test _progress_chunk_link with error during chunk fetch."""
474-
# Create a queue instance without initializing
475-
queue = Mock(spec=SeaCloudFetchQueue)
476-
queue._current_chunk_link = ExternalLink(
477-
external_link="https://example.com/data/chunk0",
478-
expiration="2025-07-03T05:51:18.118009",
479-
row_count=100,
480-
byte_count=1024,
481-
row_offset=0,
482-
chunk_index=0,
483-
next_chunk_index=1,
484-
http_headers={"Authorization": "Bearer token123"},
485-
)
486-
queue._sea_client = mock_sea_client
487-
queue._statement_id = "test-statement-123"
488-
489-
# Setup the mock client to raise an error
490-
error_message = "Network error"
491-
mock_sea_client.get_chunk_link.side_effect = Exception(error_message)
492-
493-
# Call the method directly
494-
with pytest.raises(ServerOperationError, match=error_message):
495-
SeaCloudFetchQueue._progress_chunk_link(queue)
496-
497-
# Verify the client was called
498-
mock_sea_client.get_chunk_link.assert_called_once_with("test-statement-123", 1)
499-
500-
@patch("databricks.sql.backend.sea.queue.logger")
501-
def test_create_next_table_no_current_link(self, mock_logger):
502-
"""Test _create_next_table with no current link."""
503-
# Create a queue instance without initializing
504-
queue = Mock(spec=SeaCloudFetchQueue)
505-
queue._current_chunk_link = None
506-
507-
# Call the method directly
508-
result = SeaCloudFetchQueue._create_next_table(queue)
509-
510-
# Verify debug message was logged
511-
mock_logger.debug.assert_called_with(
512-
"SeaCloudFetchQueue: No current chunk link, returning"
513-
)
514-
515-
# Verify the result is None
516-
assert result is None
517-
518393
@patch("databricks.sql.backend.sea.queue.logger")
519394
def test_create_next_table_success(self, mock_logger):
520395
"""Test _create_next_table with successful table creation."""
521396
# Create a queue instance without initializing
522397
queue = Mock(spec=SeaCloudFetchQueue)
523-
queue._current_chunk_link = ExternalLink(
524-
external_link="https://example.com/data/chunk0",
525-
expiration="2025-07-03T05:51:18.118009",
526-
row_count=100,
527-
byte_count=1024,
528-
row_offset=50,
529-
chunk_index=0,
530-
next_chunk_index=1,
531-
http_headers={"Authorization": "Bearer token123"},
532-
)
398+
queue._current_chunk_index = 0
533399
queue.download_manager = Mock()
534400

535401
# Mock the dependencies
536402
mock_table = Mock()
537-
queue._create_table_at_offset = Mock(return_value=mock_table)
403+
mock_chunk_link = Mock()
404+
queue._get_chunk_link = Mock(return_value=mock_chunk_link)
538405
queue._create_table_from_link = Mock(return_value=mock_table)
539-
queue._progress_chunk_link = Mock()
540406

541407
# Call the method directly
542408
result = SeaCloudFetchQueue._create_next_table(queue)
543409

544-
# Verify the table was created
545-
queue._create_table_from_link.assert_called_once_with(queue._current_chunk_link)
410+
# Verify the chunk index was incremented
411+
assert queue._current_chunk_index == 1
412+
413+
# Verify the chunk link was retrieved
414+
queue._get_chunk_link.assert_called_once_with(1)
546415

547-
# Verify progress was called
548-
queue._progress_chunk_link.assert_called_once()
416+
# Verify the table was created from the link
417+
queue._create_table_from_link.assert_called_once_with(mock_chunk_link)
549418

550419
# Verify the result is the table
551420
assert result == mock_table

0 commit comments

Comments
 (0)