1313from typing_extensions import override
1414
1515from apify_client import ApifyClientAsync
16+ from crawlee ._utils .crypto import crypto_random_object_id
1617from crawlee .storage_clients ._base import RequestQueueClient
1718from crawlee .storage_clients .models import AddRequestsResponse , ProcessedRequest , RequestQueueMetadata
1819
@@ -65,10 +66,7 @@ def __init__(
6566 self ,
6667 * ,
6768 api_client : RequestQueueClientAsync ,
68- id : str ,
69- name : str | None ,
70- total_request_count : int ,
71- handled_request_count : int ,
69+ metadata : RequestQueueMetadata ,
7270 ) -> None :
7371 """Initialize a new instance.
7472
@@ -77,11 +75,8 @@ def __init__(
7775 self ._api_client = api_client
7876 """The Apify request queue client for API operations."""
7977
80- self ._id = id
81- """The ID of the request queue."""
82-
83- self ._name = name
84- """The name of the request queue."""
78+ self ._metadata = metadata
79+ """Additional data related to the RequestQueue."""
8580
8681 self ._queue_head = deque [str ]()
8782 """A deque to store request unique keys in the queue head."""
@@ -95,40 +90,43 @@ def __init__(
9590 self ._should_check_for_forefront_requests = False
9691 """Whether to check for forefront requests in the next list_head call."""
9792
98- self ._had_multiple_clients = False
99- """Whether the request queue has been accessed by multiple clients."""
100-
101- self ._initial_total_count = total_request_count
102- """The initial total request count (from the API) when the queue was opened."""
103-
104- self ._initial_handled_count = handled_request_count
105- """The initial handled request count (from the API) when the queue was opened."""
106-
107- self ._assumed_total_count = 0
108- """The number of requests we assume are in the queue (tracked manually for this instance)."""
109-
110- self ._assumed_handled_count = 0
111- """The number of requests we assume have been handled (tracked manually for this instance)."""
112-
11393 self ._fetch_lock = asyncio .Lock ()
11494 """Fetch lock to minimize race conditions when communicating with API."""
11595
96+ async def _get_metadata_estimate (self ) -> RequestQueueMetadata :
97+ """Try to get cached metadata first. If multiple clients, fuse with global metadata.
98+
99+ This method is used internally to avoid unnecessary API call unless needed (multiple clients).
100+ Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one
101+ client, it is the better choice.
102+ """
103+ if self ._metadata .had_multiple_clients :
104+ return await self .get_metadata ()
105+ # Get local estimation (will not include changes done bo another client)
106+ return self ._metadata
107+
116108 @override
117109 async def get_metadata (self ) -> RequestQueueMetadata :
118- total_count = self ._initial_total_count + self ._assumed_total_count
119- handled_count = self ._initial_handled_count + self ._assumed_handled_count
120- pending_count = total_count - handled_count
110+ """Get metadata about the request queue.
121111
112+ Returns:
113+ Metadata from the API, merged with local estimation, because in some cases, the data from the API can
114+ be delayed.
115+ """
116+ response = await self ._api_client .get ()
117+ if response is None :
118+ raise ValueError ('Failed to fetch request queue metadata from the API.' )
119+ # Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)
122120 return RequestQueueMetadata (
123- id = self . _id ,
124- name = self . _name ,
125- total_request_count = total_count ,
126- handled_request_count = handled_count ,
127- pending_request_count = pending_count ,
128- created_at = datetime . now ( timezone . utc ),
129- modified_at = datetime . now ( timezone . utc ),
130- accessed_at = datetime . now ( timezone . utc ),
131- had_multiple_clients = self ._had_multiple_clients ,
121+ id = response [ 'id' ] ,
122+ name = response [ 'name' ] ,
123+ total_request_count = max ( response [ 'totalRequestCount' ], self . _metadata . total_request_count ) ,
124+ handled_request_count = max ( response [ 'handledRequestCount' ], self . _metadata . handled_request_count ) ,
125+ pending_request_count = response [ 'pendingRequestCount' ] ,
126+ created_at = min ( response [ 'createdAt' ], self . _metadata . created_at ),
127+ modified_at = max ( response [ 'modifiedAt' ], self . _metadata . modified_at ),
128+ accessed_at = max ( response [ 'accessedAt' ], self . _metadata . accessed_at ),
129+ had_multiple_clients = response [ 'hadMultipleClients' ] or self ._metadata . had_multiple_clients ,
132130 )
133131
134132 @classmethod
@@ -187,27 +185,34 @@ async def open(
187185 )
188186 apify_rqs_client = apify_client_async .request_queues ()
189187
190- # If both id and name are provided, raise an error.
191- if id and name :
192- raise ValueError ('Only one of "id" or "name" can be specified, not both.' )
193-
194- # If id is provided, get the storage by ID.
195- if id and name is None :
196- apify_rq_client = apify_client_async .request_queue (request_queue_id = id )
188+ match (id , name ):
189+ case (None , None ):
190+ # If both id and name are None, try to get the default storage ID from environment variables.
191+ # The default storage ID environment variable is set by the Apify platform. It also contains
192+ # a new storage ID after Actor's reboot or migration.
193+ id = configuration .default_request_queue_id
194+ case (None , name ):
195+ # If only name is provided, get or create the storage by name.
196+ id = RequestQueueMetadata .model_validate (
197+ await apify_rqs_client .get_or_create (name = name ),
198+ ).id
199+ case (_, None ):
200+ # If only id is provided, use it.
201+ pass
202+ case (_, _):
203+ # If both id and name are provided, raise an error.
204+ raise ValueError ('Only one of "id" or "name" can be specified, not both.' )
205+ if id is None :
206+ raise RuntimeError ('Unreachable code' )
197207
198- # If name is provided, get or create the storage by name .
199- if name and id is None :
200- id = RequestQueueMetadata . model_validate (
201- await apify_rqs_client . get_or_create ( name = name ),
202- ). id
203- apify_rq_client = apify_client_async . request_queue ( request_queue_id = id )
208+ # Use suitable client_key to make `hadMultipleClients` response of Apify API useful .
209+ # It should persist across migrated or resurrected Actor runs on the Apify platform.
210+ _api_max_client_key_length = 32
211+ client_key = ( configuration . actor_run_id or crypto_random_object_id ( length = _api_max_client_key_length ))[
212+ : _api_max_client_key_length
213+ ]
204214
205- # If both id and name are None, try to get the default storage ID from environment variables.
206- # The default storage ID environment variable is set by the Apify platform. It also contains
207- # a new storage ID after Actor's reboot or migration.
208- if id is None and name is None :
209- id = configuration .default_request_queue_id
210- apify_rq_client = apify_client_async .request_queue (request_queue_id = id )
215+ apify_rq_client = apify_client_async .request_queue (request_queue_id = id , client_key = client_key )
211216
212217 # Fetch its metadata.
213218 metadata = await apify_rq_client .get ()
@@ -217,27 +222,18 @@ async def open(
217222 id = RequestQueueMetadata .model_validate (
218223 await apify_rqs_client .get_or_create (),
219224 ).id
220- apify_rq_client = apify_client_async .request_queue (request_queue_id = id )
225+ apify_rq_client = apify_client_async .request_queue (request_queue_id = id , client_key = client_key )
221226
222227 # Verify that the storage exists by fetching its metadata again.
223228 metadata = await apify_rq_client .get ()
224229 if metadata is None :
225230 raise ValueError (f'Opening request queue with id={ id } and name={ name } failed.' )
226231
227- metadata_model = RequestQueueMetadata .model_validate (
228- await apify_rqs_client .get_or_create (),
229- )
230-
231- # Ensure we have a valid ID.
232- if id is None :
233- raise ValueError ('Request queue ID cannot be None.' )
232+ metadata_model = RequestQueueMetadata .model_validate (metadata )
234233
235234 return cls (
236235 api_client = apify_rq_client ,
237- id = id ,
238- name = name ,
239- total_request_count = metadata_model .total_request_count ,
240- handled_request_count = metadata_model .handled_request_count ,
236+ metadata = metadata_model ,
241237 )
242238
243239 @override
@@ -341,7 +337,7 @@ async def add_batch_of_requests(
341337 if not processed_request .was_already_present and not processed_request .was_already_handled :
342338 new_request_count += 1
343339
344- self ._assumed_total_count += new_request_count
340+ self ._metadata . total_request_count += new_request_count
345341
346342 return api_response
347343
@@ -439,7 +435,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
439435
440436 # Update assumed handled count if this wasn't already handled
441437 if not processed_request .was_already_handled :
442- self ._assumed_handled_count += 1
438+ self ._metadata . handled_request_count += 1
443439
444440 # Update the cache with the handled request
445441 cache_key = request .unique_key
@@ -487,7 +483,7 @@ async def reclaim_request(
487483 # If the request was previously handled, decrement our handled count since
488484 # we're putting it back for processing.
489485 if request .was_already_handled and not processed_request .was_already_handled :
490- self ._assumed_handled_count -= 1
486+ self ._metadata . handled_request_count -= 1
491487
492488 # Update the cache
493489 cache_key = request .unique_key
@@ -645,7 +641,7 @@ async def _list_head(
645641 if cached_request and cached_request .hydrated :
646642 items .append (cached_request .hydrated )
647643
648- metadata = await self .get_metadata ()
644+ metadata = await self ._get_metadata_estimate ()
649645
650646 return RequestQueueHead (
651647 limit = limit ,
@@ -672,6 +668,8 @@ async def _list_head(
672668
673669 # Update the queue head cache
674670 self ._queue_has_locked_requests = response .get ('queueHasLockedRequests' , False )
671+ # Check if there is another client working with the RequestQueue
672+ self ._metadata .had_multiple_clients = response .get ('hadMultipleClients' , False )
675673
676674 for request_data in response .get ('items' , []):
677675 request = Request .model_validate (request_data )
0 commit comments