Skip to content

Commit 4abb3ad

Browse files
add link to download manager before notifying consumer
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 6894b4c commit 4abb3ad

File tree

1 file changed

+11
-13
lines changed

1 file changed

+11
-13
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def __init__(
152152

153153
self._shutdown_event = threading.Event()
154154

155-
self._link_present = threading.Condition()
155+
self._link_data_update = threading.Condition()
156156
self._error: Optional[Exception] = None
157157
self.chunk_index_to_link: Dict[int, "ExternalLink"] = {}
158158

@@ -162,7 +162,7 @@ def __init__(
162162
self.total_chunk_count = total_chunk_count
163163

164164
def _get_next_chunk_index(self) -> Optional[int]:
165-
with self._link_present:
165+
with self._link_data_update:
166166
max_chunk_index = max(self.chunk_index_to_link.keys(), default=None)
167167
if max_chunk_index is None:
168168
return 0
@@ -176,20 +176,18 @@ def _trigger_next_batch_download(self) -> bool:
176176

177177
try:
178178
links = self.backend.get_chunk_links(self._statement_id, next_chunk_index)
179-
with self._link_present:
180-
self.chunk_index_to_link.update(
181-
{link.chunk_index: link for link in links}
182-
)
183-
self._link_present.notify_all()
184-
for link in links:
185-
self.download_manager.add_link(self._convert_to_thrift_link(link))
179+
with self._link_data_update:
180+
for l in links:
181+
self.chunk_index_to_link[l.chunk_index] = l
182+
self.download_manager.add_link(self._convert_to_thrift_link(l))
183+
self._link_data_update.notify_all()
186184
except Exception as e:
187185
logger.error(
188186
f"LinkFetcher: Error fetching links for chunk {next_chunk_index}: {e}"
189187
)
190-
with self._link_present:
188+
with self._link_data_update:
191189
self._error = e
192-
self._link_present.notify_all()
190+
self._link_data_update.notify_all()
193191
return False
194192

195193
return True
@@ -198,11 +196,11 @@ def get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
198196
if chunk_index >= self.total_chunk_count:
199197
return None
200198

201-
with self._link_present:
199+
with self._link_data_update:
202200
while chunk_index not in self.chunk_index_to_link:
203201
if self._error:
204202
raise self._error
205-
self._link_present.wait()
203+
self._link_data_update.wait()
206204

207205
return self.chunk_index_to_link.get(chunk_index, None)
208206

0 commit comments

Comments
 (0)