22
33from abc import ABC
44import threading
5+ import time
56from typing import Dict , List , Optional , Tuple , Union
67
78from databricks .sql .cloudfetch .download_manager import ResultFileDownloadManager
@@ -152,16 +153,17 @@ def __init__(
152153
153154 self ._shutdown_event = threading .Event ()
154155
155- self ._condition = threading .Condition ()
156+ self ._link_present = threading .Condition ()
156157 self ._error = None
157158 self .chunk_index_to_link : Dict [int , "ExternalLink" ] = {}
159+
158160 for link in initial_links :
159161 self .chunk_index_to_link [link .chunk_index ] = link
160162 self .download_manager .add_link (self ._convert_to_thrift_link (link ))
161163 self .total_chunk_count = total_chunk_count
162164
163165 def _get_next_chunk_index (self ) -> Optional [int ]:
164- with self ._condition :
166+ with self ._link_present :
165167 max_chunk_index = max (self .chunk_index_to_link .keys (), default = None )
166168 if max_chunk_index is None :
167169 return 0
@@ -175,20 +177,20 @@ def _trigger_next_batch_download(self) -> bool:
175177
176178 try :
177179 links = self .backend .get_chunk_links (self ._statement_id , next_chunk_index )
178- with self ._condition :
180+ with self ._link_present :
179181 self .chunk_index_to_link .update (
180182 {link .chunk_index : link for link in links }
181183 )
182- self ._condition .notify_all ()
184+ self ._link_present .notify_all ()
183185 for link in links :
184186 self .download_manager .add_link (self ._convert_to_thrift_link (link ))
185187 except Exception as e :
186188 logger .error (
187189 f"LinkFetcher: Error fetching links for chunk { next_chunk_index } : { e } "
188190 )
189- with self ._condition :
191+ with self ._link_present :
190192 self ._error = e
191- self ._condition .notify_all ()
193+ self ._link_present .notify_all ()
192194 return False
193195
194196 return True
@@ -197,14 +199,11 @@ def get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
197199 if chunk_index >= self .total_chunk_count :
198200 return None
199201
200- with self ._condition :
201- if self ._error :
202- raise self ._error
203-
202+ with self ._link_present :
204203 while chunk_index not in self .chunk_index_to_link :
205204 if self ._error :
206205 raise self ._error
207- self ._condition .wait ()
206+ self ._link_present .wait ()
208207
209208 return self .chunk_index_to_link .get (chunk_index , None )
210209
@@ -308,6 +307,7 @@ def __init__(
308307
309308 def _create_next_table (self ) -> Union ["pyarrow.Table" , None ]:
310309 """Create next table by retrieving the logical next downloaded file."""
310+ start_time = time .time ()
311311 if not self .download_manager :
312312 logger .debug ("SeaCloudFetchQueue: No download manager, returning" )
313313 return None
@@ -319,6 +319,11 @@ def _create_next_table(self) -> Union["pyarrow.Table", None]:
319319 row_offset = chunk_link .row_offset
320320 arrow_table = self ._create_table_at_offset (row_offset )
321321
322+ end_time = time .time ()
323+ logger .info (
324+ f"SeaCloudFetchQueue: Created table at offset { row_offset } in { end_time - start_time } seconds"
325+ )
326+
322327 self .current_chunk_index += 1
323328
324329 return arrow_table
0 commit comments