Skip to content

Commit 7dffcff

Browse files
authored
Fix issue with querying request queue head multiple times in parallel (#113)
When calling `_ensure_head_is_non_empty` on `RequestQueue`, the call to query the RQ head is cached (as an optimization) so that multiple parallel calls to the storage are not made, and in each call of `_ensure_head_is_non_empty` the cached call is then awaited. In Python (unlike JavaScript), you can't await the result of an async function multiple times, it leads to an error. To get around that, you can wrap the result of the coroutine into an `asyncio.Task`, and that you can await multiple times (`asyncio.Task` behaves sort of like a JS `Promise` in this regard).
1 parent 8d5ba63 commit 7dffcff

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Changelog
44
[1.1.4](../../releases/tag/v1.1.4) - Unreleased
55
-----------------------------------------------
66

7+
### Fixes
8+
9+
- resolved issue with querying request queue head multiple times in parallel
10+
711
### Internal changes
812

913
- Fixed integration tests for Actor logger

src/apify/storages/request_queue.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from collections import OrderedDict
33
from datetime import datetime, timezone
4-
from typing import Coroutine, Dict, Optional
4+
from typing import Dict, Optional
55
from typing import OrderedDict as OrderedDictType
66
from typing import Set, Union
77

@@ -75,7 +75,7 @@ class RequestQueue(BaseStorage):
7575
_request_queue_client: Union[RequestQueueClientAsync, RequestQueueClient]
7676
_client_key = _crypto_random_object_id()
7777
_queue_head_dict: OrderedDictType[str, str]
78-
_query_queue_head_promise: Optional[Coroutine]
78+
_query_queue_head_task: Optional[asyncio.Task]
7979
_in_progress: Set[str]
8080
_last_activity: datetime
8181
_internal_timeout_seconds = 5 * 60
@@ -100,7 +100,7 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync,
100100

101101
self._request_queue_client = client.request_queue(self._id, client_key=self._client_key)
102102
self._queue_head_dict = OrderedDict()
103-
self._query_queue_head_promise = None
103+
self._query_queue_head_task = None
104104
self._in_progress = set()
105105
self._last_activity = datetime.now(timezone.utc)
106106
self._recently_handled = LRUCache[bool](max_length=RECENTLY_HANDLED_CACHE_SIZE)
@@ -369,7 +369,7 @@ async def is_finished(self) -> bool:
369369

370370
def _reset(self) -> None:
371371
self._queue_head_dict.clear()
372-
self._query_queue_head_promise = None
372+
self._query_queue_head_task = None
373373
self._in_progress.clear()
374374
self._recently_handled.clear()
375375
self._assumed_total_count = 0
@@ -402,7 +402,7 @@ async def _queue_query_head(self, limit: int) -> Dict:
402402
})
403403

404404
# This is needed so that the next call to _ensureHeadIsNonEmpty() will fetch the queue head again.
405-
self._query_queue_head_promise = None
405+
self._query_queue_head_task = None
406406

407407
return {
408408
'wasLimitReached': len(list_head['items']) >= limit,
@@ -420,15 +420,15 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi
420420
if limit is None:
421421
limit = max(self._in_progress_count() * QUERY_HEAD_BUFFER, QUERY_HEAD_MIN_LENGTH)
422422

423-
if self._query_queue_head_promise is None:
424-
self._query_queue_head_promise = self._queue_query_head(limit)
423+
if self._query_queue_head_task is None:
424+
self._query_queue_head_task = asyncio.Task(self._queue_query_head(limit))
425425

426-
queue_head = await self._query_queue_head_promise
426+
queue_head = await self._query_queue_head_task
427427

428428
# TODO: I feel this code below can be greatly simplified... (comes from TS implementation *wink*)
429429

430430
""" If queue is still empty then one of the following holds:
431-
- the other calls waiting for this promise already consumed all the returned requests
431+
- the other calls waiting for this task already consumed all the returned requests
432432
- the limit was too low and contained only requests in progress
433433
- the writes from other clients were not propagated yet
434434
- the whole queue was processed and we are done

0 commit comments

Comments
 (0)