Skip to content

Commit 2701e5d

Browse files
iterate by chunk index instead of link
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 5791745 commit 2701e5d

File tree

2 files changed

+22
-42
lines changed

2 files changed

+22
-42
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ def get_chunk_links(
661661
)
662662
response = GetChunksResponse.from_dict(response_data)
663663

664-
links = response.external_links
664+
links = response.external_links or []
665665
return links
666666

667667
# == Metadata Operations ==

src/databricks/sql/backend/sea/queue.py

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,12 @@ def __init__(
167167
)
168168
)
169169

170-
initial_links = result_data.external_links
170+
initial_links = result_data.external_links or []
171171
self._chunk_index_to_link = {link.chunk_index: link for link in initial_links}
172172

173-
first_link = self._chunk_index_to_link.get(0, None)
173+
# Track the current chunk we're processing
174+
self._current_chunk_index = 0
175+
first_link = self._chunk_index_to_link.get(self._current_chunk_index, None)
174176
if not first_link:
175177
# possibly an empty response
176178
return
@@ -182,11 +184,8 @@ def __init__(
182184
ssl_options=ssl_options,
183185
)
184186

185-
# Track the current chunk we're processing
186-
self._current_chunk_link = first_link
187-
188187
# Initialize table and position
189-
self.table = self._create_table_from_link(self._current_chunk_link)
188+
self.table = self._create_table_from_link(first_link)
190189

191190
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
192191
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
@@ -203,38 +202,22 @@ def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink
203202

204203
def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
205204
if chunk_index >= self._total_chunk_count:
206-
raise ValueError(
207-
f"Chunk index {chunk_index} is greater than total chunk count {self._total_chunk_count}"
208-
)
205+
return None
209206

210207
if chunk_index not in self._chunk_index_to_link:
211208
links = self._sea_client.get_chunk_links(self._statement_id, chunk_index)
212-
self._chunk_index_to_link.update({link.chunk_index: link for link in links})
213-
return self._chunk_index_to_link.get(chunk_index, None)
214-
215-
def _progress_chunk_link(self):
216-
"""Progress to the next chunk link."""
217-
if not self._current_chunk_link:
218-
return None
219-
220-
next_chunk_index = self._current_chunk_link.next_chunk_index
221-
222-
if next_chunk_index is None:
223-
self._current_chunk_link = None
224-
return None
225-
226-
self._current_chunk_link = self._get_chunk_link(next_chunk_index)
227-
if not self._current_chunk_link:
228-
logger.error(
229-
"SeaCloudFetchQueue: unable to retrieve link for chunk {}".format(
230-
next_chunk_index
231-
)
209+
self._chunk_index_to_link.update({l.chunk_index: l for l in links})
210+
211+
link = self._chunk_index_to_link.get(chunk_index, None)
212+
if not link:
213+
raise ServerOperationError(
214+
f"Chunk index {chunk_index} is not in the chunk index to link map",
215+
{
216+
"operation-id": self._statement_id,
217+
"diagnostic-info": None,
218+
},
232219
)
233-
return None
234-
235-
logger.debug(
236-
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
237-
)
220+
return link
238221

239222
def _create_table_from_link(
240223
self, link: "ExternalLink"
@@ -251,11 +234,8 @@ def _create_table_from_link(
251234

252235
def _create_next_table(self) -> Union["pyarrow.Table", None]:
253236
"""Create next table by retrieving the logical next downloaded file."""
254-
255-
self._progress_chunk_link()
256-
257-
if not self._current_chunk_link:
258-
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
237+
self._current_chunk_index += 1
238+
next_chunk_link = self._get_chunk_link(self._current_chunk_index)
239+
if not next_chunk_link:
259240
return None
260-
261-
return self._create_table_from_link(self._current_chunk_link)
241+
return self._create_table_from_link(next_chunk_link)

0 commit comments

Comments
 (0)