Skip to content

Commit e11c065

Browse files
notify listeners on scheduling downloads
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 746df87 commit e11c065

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

src/databricks/sql/cloudfetch/download_manager.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ def get_next_downloaded_file(self, next_row_offset: int) -> DownloadedFile:
5252
This function gets the next downloaded file in which its rows start at the specified next_row_offset
5353
in relation to the full result. File downloads are scheduled if not already, and once the correct
5454
download handler is located, the function waits for the download status and returns the resulting file.
55-
If there are no more downloads, a download was not successful, or the correct file could not be located,
56-
this function shuts down the thread pool and returns None.
5755
5856
Args:
5957
next_row_offset (int): The offset of the starting row of the next file we want data from.
@@ -63,7 +61,7 @@ def get_next_downloaded_file(self, next_row_offset: int) -> DownloadedFile:
6361
self._schedule_downloads()
6462

6563
# No more files to download from this batch of links
66-
if len(self._download_tasks) == 0:
64+
while len(self._download_tasks) == 0:
6765
if self._thread_pool._shutdown:
6866
raise Error("download manager shut down before file was ready")
6967
with self._download_condition:
@@ -105,6 +103,9 @@ def _schedule_downloads(self):
105103
task = self._thread_pool.submit(handler.run)
106104
self._download_tasks.append(task)
107105

106+
with self._download_condition:
107+
self._download_condition.notify_all()
108+
108109
def add_link(self, link: TSparkArrowResultLink):
109110
"""
110111
Add more links to the download manager.
@@ -123,6 +124,8 @@ def add_link(self, link: TSparkArrowResultLink):
123124
)
124125
self._pending_links.append(link)
125126

127+
self._schedule_downloads()
128+
126129
def _shutdown_manager(self):
127130
# Clear download handlers and shutdown the thread pool
128131
self._pending_links = []

0 commit comments

Comments
 (0)