@@ -242,56 +242,43 @@ def __init__(
242242 self .download_manager : Optional ["ResultFileDownloadManager" ] = None
243243
244244 def next_n_rows (self , num_rows : int ) -> "pyarrow.Table" :
245- """Get up to the next n rows of the cloud fetch Arrow dataframes."""
245+ """
246+ Get up to the next n rows of the cloud fetch Arrow dataframes.
247+
248+ Args:
249+ num_rows (int): Number of rows to retrieve.
250+ Returns:
251+ pyarrow.Table
252+ """
246253 if not self .table :
254+ logger .debug ("CloudFetchQueue: no more rows available" )
247255 # Return empty pyarrow table to cause retry of fetch
248256 return self ._create_empty_table ()
257+ logger .debug ("CloudFetchQueue: trying to get {} next rows" .format (num_rows ))
249258
250- logger .info ("SeaCloudFetchQueue: Retrieving up to {} rows" .format (num_rows ))
251- results = pyarrow .Table .from_pydict ({}) # Empty table
252- rows_fetched = 0
259+ results = self .table .slice (0 , 0 )
253260
254261 while num_rows > 0 and self .table :
255262 # Get remaining of num_rows or the rest of the current table, whichever is smaller
256263 length = min (num_rows , self .table .num_rows - self .table_row_index )
257- logger .info (
258- "CloudFetchQueue: Slicing table from index {} for {} rows (table has {} rows total)" .format (
259- self .table_row_index , length , self .table .num_rows
260- )
261- )
262264 table_slice = self .table .slice (self .table_row_index , length )
263265
264266 # Concatenate results if we have any
265267 if results .num_rows > 0 :
266- logger .info (
267- "CloudFetchQueue: Concatenating {} rows to existing {} rows" .format (
268- table_slice .num_rows , results .num_rows
269- )
270- )
271268 results = pyarrow .concat_tables ([results , table_slice ])
272269 else :
273270 results = table_slice
274271
275272 self .table_row_index += table_slice .num_rows
276- rows_fetched += table_slice .num_rows
277-
278- logger .info (
279- "CloudFetchQueue: After slice, table_row_index={}, rows_fetched={}" .format (
280- self .table_row_index , rows_fetched
281- )
282- )
283273
284274 # Replace current table with the next table if we are at the end of the current table
285275 if self .table_row_index == self .table .num_rows :
286- logger .info (
287- "CloudFetchQueue: Reached end of current table, fetching next"
288- )
289276 self .table = self ._create_next_table ()
290277 self .table_row_index = 0
291278
292279 num_rows -= table_slice .num_rows
293280
294- logger .info ("CloudFetchQueue: Retrieved {} rows" .format (results .num_rows ))
281+ logger .debug ("CloudFetchQueue: collected {} next rows" .format (results .num_rows ))
295282 return results
296283
297284 def remaining_rows (self ) -> "pyarrow.Table" :
0 commit comments