@@ -141,10 +141,13 @@ def __init__(
141141 self ._error : Optional [Exception ] = None
142142 self .chunk_index_to_link : Dict [int , "ExternalLink" ] = {}
143143
144- for link in initial_links :
144+ self ._add_links (initial_links )
145+ self .total_chunk_count = total_chunk_count
146+
147+ def _add_links (self , links : List ["ExternalLink" ]):
148+ for link in links :
145149 self .chunk_index_to_link [link .chunk_index ] = link
146150 self .download_manager .add_link (LinkFetcher ._convert_to_thrift_link (link ))
147- self .total_chunk_count = total_chunk_count
148151
149152 def _get_next_chunk_index (self ) -> Optional [int ]:
150153 with self ._link_data_update :
@@ -162,9 +165,7 @@ def _trigger_next_batch_download(self) -> bool:
162165 try :
163166 links = self .backend .get_chunk_links (self ._statement_id , next_chunk_index )
164167 with self ._link_data_update :
165- for l in links :
166- self .chunk_index_to_link [l .chunk_index ] = l
167- self .download_manager .add_link (self ._convert_to_thrift_link (l ))
168+ self ._add_links (links )
168169 self ._link_data_update .notify_all ()
169170 except Exception as e :
170171 logger .error (
@@ -185,6 +186,12 @@ def get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
185186 while chunk_index not in self .chunk_index_to_link :
186187 if self ._error :
187188 raise self ._error
189+ if self ._shutdown_event .is_set ():
190+ raise ProgrammingError (
191+ "LinkFetcher is shutting down without providing link for chunk index {}" .format (
192+ chunk_index
193+ )
194+ )
188195 self ._link_data_update .wait ()
189196
190197 return self .chunk_index_to_link .get (chunk_index , None )
0 commit comments