Skip to content

Commit 58d1d7f

Browse files
reduce repetition in cloud fetch queue
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 9ddb3ec commit 58d1d7f

File tree

1 file changed

+31
-63
lines changed

1 file changed

+31
-63
lines changed

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 31 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,33 @@ def __init__(
162162
# Initialize download manager - will be set by subclasses
163163
self.download_manager: Optional["ResultFileDownloadManager"] = None
164164

165+
def remaining_rows(self) -> "pyarrow.Table":
166+
"""
167+
Get all remaining rows of the cloud fetch Arrow dataframes.
168+
169+
Returns:
170+
pyarrow.Table
171+
"""
172+
if not self.table:
173+
# Return empty pyarrow table to cause retry of fetch
174+
return self._create_empty_table()
175+
176+
results = pyarrow.Table.from_pydict({}) # Empty table
177+
while self.table:
178+
table_slice = self.table.slice(
179+
self.table_row_index, self.table.num_rows - self.table_row_index
180+
)
181+
if results.num_rows > 0:
182+
results = pyarrow.concat_tables([results, table_slice])
183+
else:
184+
results = table_slice
185+
186+
self.table_row_index += table_slice.num_rows
187+
self.table = self._create_next_table()
188+
self.table_row_index = 0
189+
190+
return results
191+
165192
def next_n_rows(self, num_rows: int) -> "pyarrow.Table":
166193
"""Get up to the next n rows of the cloud fetch Arrow dataframes."""
167194
if not self.table:
@@ -216,20 +243,15 @@ def next_n_rows(self, num_rows: int) -> "pyarrow.Table":
216243
logger.info("SeaCloudFetchQueue: Retrieved {} rows".format(results.num_rows))
217244
return results
218245

219-
@abstractmethod
220-
def remaining_rows(self) -> "pyarrow.Table":
221-
"""Get all remaining rows of the cloud fetch Arrow dataframes."""
222-
pass
246+
def _create_empty_table(self) -> "pyarrow.Table":
247+
"""Create a 0-row table with just the schema bytes."""
248+
return create_arrow_table_from_arrow_file(self.schema_bytes, self.description)
223249

224250
@abstractmethod
225251
def _create_next_table(self) -> Union["pyarrow.Table", None]:
226252
"""Create next table by retrieving the logical next downloaded file."""
227253
pass
228254

229-
def _create_empty_table(self) -> "pyarrow.Table":
230-
"""Create a 0-row table with just the schema bytes."""
231-
return create_arrow_table_from_arrow_file(self.schema_bytes, self.description)
232-
233255

234256
class SeaCloudFetchQueue(CloudFetchQueue):
235257
"""Queue implementation for EXTERNAL_LINKS disposition with ARROW format for SEA backend."""
@@ -283,7 +305,7 @@ def __init__(
283305
)
284306

285307
if initial_links:
286-
initial_links = []
308+
initial_links = []
287309
# logger.debug("SeaCloudFetchQueue: Initial links provided:")
288310
# for link in initial_links:
289311
# logger.debug(
@@ -386,33 +408,6 @@ def _fetch_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
386408

387409
return link
388410

389-
def remaining_rows(self) -> "pyarrow.Table":
390-
"""
391-
Get all remaining rows of the cloud fetch Arrow dataframes.
392-
393-
Returns:
394-
pyarrow.Table
395-
"""
396-
if not self.table:
397-
# Return empty pyarrow table to cause retry of fetch
398-
return self._create_empty_table()
399-
400-
results = pyarrow.Table.from_pydict({}) # Empty table
401-
while self.table:
402-
table_slice = self.table.slice(
403-
self.table_row_index, self.table.num_rows - self.table_row_index
404-
)
405-
if results.num_rows > 0:
406-
results = pyarrow.concat_tables([results, table_slice])
407-
else:
408-
results = table_slice
409-
410-
self.table_row_index += table_slice.num_rows
411-
self.table = self._create_next_table()
412-
self.table_row_index = 0
413-
414-
return results
415-
416411
def _create_next_table(self) -> Union["pyarrow.Table", None]:
417412
"""Create next table by retrieving the logical next downloaded file."""
418413
# if we're still processing the current table, just return it
@@ -596,33 +591,6 @@ def __init__(
596591
# Initialize table and position
597592
self.table = self._create_next_table()
598593

599-
def remaining_rows(self) -> "pyarrow.Table":
600-
"""
601-
Get all remaining rows of the cloud fetch Arrow dataframes.
602-
603-
Returns:
604-
pyarrow.Table
605-
"""
606-
if not self.table:
607-
# Return empty pyarrow table to cause retry of fetch
608-
return self._create_empty_table()
609-
610-
results = pyarrow.Table.from_pydict({}) # Empty table
611-
while self.table:
612-
table_slice = self.table.slice(
613-
self.table_row_index, self.table.num_rows - self.table_row_index
614-
)
615-
if results.num_rows > 0:
616-
results = pyarrow.concat_tables([results, table_slice])
617-
else:
618-
results = table_slice
619-
620-
self.table_row_index += table_slice.num_rows
621-
self.table = self._create_next_table()
622-
self.table_row_index = 0
623-
624-
return results
625-
626594
def _create_next_table(self) -> Union["pyarrow.Table", None]:
627595
"""Create next table by retrieving the logical next downloaded file."""
628596
logger.debug(

0 commit comments

Comments
 (0)