@@ -24,9 +24,6 @@ def __init__(
2424 ssl_options : SSLOptions ,
2525 ):
2626 self ._pending_links : List [TSparkArrowResultLink ] = []
27- # Add a cache to store downloaded files by row offset
28- self ._downloaded_files_cache : dict [int , DownloadedFile ] = {}
29-
3027 for link in links :
3128 if link .rowCount <= 0 :
3229 continue
@@ -59,97 +56,30 @@ def get_next_downloaded_file(
5956 Args:
6057 next_row_offset (int): The offset of the starting row of the next file we want data from.
6158 """
62- logger .info (
63- f"ResultFileDownloadManager: get_next_downloaded_file for row offset { next_row_offset } "
64- )
65-
66- # Check if we have this file in the cache
67- if next_row_offset in self ._downloaded_files_cache :
68- logger .info (
69- f"ResultFileDownloadManager: Found file in cache for row offset { next_row_offset } "
70- )
71- return self ._downloaded_files_cache [next_row_offset ]
7259
7360 # Make sure the download queue is always full
7461 self ._schedule_downloads ()
7562
7663 # No more files to download from this batch of links
7764 if len (self ._download_tasks ) == 0 :
78- logger .info ("ResultFileDownloadManager: No more download tasks" )
7965 self ._shutdown_manager ()
8066 return None
8167
82- # Log all pending download tasks
83- logger .info (
84- f"ResultFileDownloadManager: { len (self ._download_tasks )} download tasks pending"
85- )
86-
87- # Find the task that matches the requested row offset
88- matching_task_index = None
89- for i , task in enumerate (self ._download_tasks ):
90- if task .done ():
91- try :
92- file = task .result (timeout = 0 ) # Don't block
93- logger .info (
94- f"Task { i } : start_row_offset={ file .start_row_offset } , row_count={ file .row_count } "
95- )
96- if file .start_row_offset == next_row_offset :
97- matching_task_index = i
98- break
99- except Exception as e :
100- logger .error (f"Error getting task result: { e } " )
101-
102- # If we found a matching task, use it
103- if matching_task_index is not None :
104- logger .info (
105- f"ResultFileDownloadManager: Found matching task at index { matching_task_index } "
106- )
107- task = self ._download_tasks .pop (matching_task_index )
108- file = task .result ()
109- # Cache the file for future use
110- self ._downloaded_files_cache [file .start_row_offset ] = file
111- return file
112-
113- # If we didn't find a matching task, wait for all tasks to complete and check again
114- logger .info (
115- "ResultFileDownloadManager: No matching task found, waiting for all tasks to complete"
116- )
117- completed_files = []
118- for task in self ._download_tasks :
119- try :
120- file = task .result () # Wait for the task to complete
121- completed_files .append (file )
122- # Cache the file for future use
123- self ._downloaded_files_cache [file .start_row_offset ] = file
124- logger .info (
125- f"Completed file: start_row_offset={ file .start_row_offset } , row_count={ file .row_count } "
68+ task = self ._download_tasks .pop (0 )
69+ # Future's `result()` method will wait for the call to complete, and return
70+ # the value returned by the call. If the call throws an exception - `result()`
71+ # will throw the same exception
72+ file = task .result ()
73+ if (next_row_offset < file .start_row_offset ) or (
74+ next_row_offset > file .start_row_offset + file .row_count
75+ ):
76+ logger .debug (
77+ "ResultFileDownloadManager: file does not contain row {}, start {}, row count {}" .format (
78+ next_row_offset , file .start_row_offset , file .row_count
12679 )
127- except Exception as e :
128- logger .error (f"Error getting task result: { e } " )
129-
130- # Clear the download tasks since we've processed them all
131- self ._download_tasks = []
132-
133- # Check if any of the completed files match the requested offset
134- matching_file = next (
135- (f for f in completed_files if f .start_row_offset == next_row_offset ), None
136- )
137- if matching_file :
138- logger .info (
139- f"ResultFileDownloadManager: Found matching file with offset { next_row_offset } "
14080 )
141- return matching_file
142-
143- # If we still don't have a matching file, log the issue and return None
144- logger .warning (
145- f"ResultFileDownloadManager: No file found with row offset { next_row_offset } "
146- )
147- # Log cache contents for debugging
148- logger .info ("ResultFileDownloadManager: Cache contents:" )
149- for offset , cached_file in self ._downloaded_files_cache .items ():
150- logger .info (f" offset={ offset } , row_count={ cached_file .row_count } " )
15181
152- return None
82+ return file
15383
15484 def _schedule_downloads (self ):
15585 """
@@ -171,26 +101,6 @@ def _schedule_downloads(self):
171101 task = self ._thread_pool .submit (handler .run )
172102 self ._download_tasks .append (task )
173103
174- def add_links (self , links : List [TSparkArrowResultLink ]):
175- """
176- Add more links to the download manager.
177-
178- Args:
179- links: List of links to add
180- """
181- for link in links :
182- if link .rowCount <= 0 :
183- continue
184- logger .debug (
185- "ResultFileDownloadManager: adding file link, start offset {}, row count: {}" .format (
186- link .startRowOffset , link .rowCount
187- )
188- )
189- self ._pending_links .append (link )
190-
191- # Make sure the download queue is always full
192- self ._schedule_downloads ()
193-
194104 def _shutdown_manager (self ):
195105 # Clear download handlers and shutdown the thread pool
196106 self ._pending_links = []
0 commit comments