1010from crawlee .storage_clients .models import AddRequestsResponse , ProcessedRequest , RequestQueueMetadata
1111
1212from apify import Request
13- from apify .storage_clients ._apify ._utils import unique_key_to_request_id
13+ from apify .storage_clients ._apify ._utils import _Request , unique_key_to_request_id
1414
1515if TYPE_CHECKING :
1616 from collections .abc import Sequence
@@ -56,21 +56,21 @@ def __init__(
5656 """The Apify request queue client for API operations."""
5757
5858 self ._requests_cache : LRUCache [str , Request ] = LRUCache (maxsize = cache_size )
59- """A cache to store request objects. Request unique key is used as the cache key."""
59+ """A cache to store request objects. Request id is used as the cache key."""
6060
6161 self ._head_requests : deque [str ] = deque ()
62- """Ordered unique keys of requests that represent queue head."""
62+ """Ordered ids of requests that represent queue head."""
6363
6464 self ._requests_already_handled : set [str ] = set ()
6565 """Local estimation of requests unique keys that are already present and handled on the platform.
6666
6767 - To enhance local deduplication.
6868 - To reduce the _requests_cache size. Already handled requests are most likely not going to be needed again,
69- so no need to cache more than their unique_key .
69+ so no need to cache more than their id .
7070 """
7171
7272 self ._requests_in_progress : set [str ] = set ()
73- """Set of requests unique keys that are being processed locally.
73+ """Set of requests ids that are being processed locally.
7474
7575 - To help decide if the RQ is finished or not. This is the only consumer, so it can be tracked locally.
7676 """
@@ -105,25 +105,30 @@ async def add_batch_of_requests(
105105 already_present_requests : list [ProcessedRequest ] = []
106106
107107 for request in requests :
108+ # Calculate id for request
109+ _request = _Request .model_validate (request .model_dump ())
110+
108111 # Check if request is known to be already handled (it has to be present as well.)
109- if request . unique_key in self ._requests_already_handled :
112+ if _request . id in self ._requests_already_handled :
110113 already_present_requests .append (
111114 ProcessedRequest .model_validate (
112115 {
113- 'uniqueKey' : request .unique_key ,
116+ 'id' : _request .id ,
117+ 'uniqueKey' : _request .unique_key ,
114118 'wasAlreadyPresent' : True ,
115119 'wasAlreadyHandled' : True ,
116120 }
117121 )
118122 )
119123 # Check if request is known to be already present, but unhandled
120- elif self ._requests_cache .get (request . unique_key ):
124+ elif self ._requests_cache .get (_request . id ):
121125 already_present_requests .append (
122126 ProcessedRequest .model_validate (
123127 {
124- 'uniqueKey' : request .unique_key ,
128+ 'id' : _request .id ,
129+ 'uniqueKey' : _request .unique_key ,
125130 'wasAlreadyPresent' : True ,
126- 'wasAlreadyHandled' : request .was_already_handled ,
131+ 'wasAlreadyHandled' : _request .was_already_handled ,
127132 }
128133 )
129134 )
@@ -132,11 +137,11 @@ async def add_batch_of_requests(
132137 new_requests .append (request )
133138
134139 # Update local caches
135- self ._requests_cache [request . unique_key ] = request
140+ self ._requests_cache [_request . id ] = request
136141 if forefront :
137- self ._head_requests .append (request . unique_key )
142+ self ._head_requests .append (_request . id )
138143 else :
139- self ._head_requests .appendleft (request . unique_key )
144+ self ._head_requests .appendleft (_request . id )
140145
141146 if new_requests :
142147 # Prepare requests for API by converting to dictionaries.
@@ -155,7 +160,7 @@ async def add_batch_of_requests(
155160 api_response .processed_requests .extend (already_present_requests )
156161 # Remove unprocessed requests from the cache
157162 for unprocessed_request in api_response .unprocessed_requests :
158- self ._requests_cache .pop (unprocessed_request .unique_key , None )
163+ self ._requests_cache .pop (unique_key_to_request_id ( unprocessed_request .unique_key ) , None )
159164
160165 else :
161166 api_response = AddRequestsResponse .model_validate (
@@ -181,10 +186,21 @@ async def get_request(self, unique_key: str) -> Request | None:
181186 Returns:
182187 The request or None if not found.
183188 """
184- if unique_key in self ._requests_cache :
185- return self ._requests_cache [unique_key ]
189+ return await self ._get_request (id = unique_key_to_request_id (unique_key ))
190+
191+ async def _get_request (self , id : str ) -> Request | None :
192+ """Get a request by unique key.
193+
194+ Args:
195+ id: Id of request to get.
196+
197+ Returns:
198+ The request or None if not found.
199+ """
200+ if id in self ._requests_cache :
201+ return self ._requests_cache [id ]
186202
187- response = await self ._api_client .get_request (unique_key_to_request_id ( unique_key ) )
203+ response = await self ._api_client .get_request (id )
188204
189205 if response is None :
190206 return None
@@ -205,13 +221,10 @@ async def fetch_next_request(self) -> Request | None:
205221 await self ._ensure_head_is_non_empty ()
206222
207223 while self ._head_requests :
208- request_unique_key = self ._head_requests .pop ()
209- if (
210- request_unique_key not in self ._requests_in_progress
211- and request_unique_key not in self ._requests_already_handled
212- ):
213- self ._requests_in_progress .add (request_unique_key )
214- return await self .get_request (request_unique_key )
224+ request_id = self ._head_requests .pop ()
225+ if request_id not in self ._requests_in_progress and request_id not in self ._requests_already_handled :
226+ self ._requests_in_progress .add (request_id )
227+ return await self ._get_request (request_id )
215228 # No request locally and the ones returned from the platform are already in progress.
216229 return None
217230
@@ -236,31 +249,24 @@ async def _list_head(self) -> None:
236249
237250 # Update the cached data
238251 for request_data in response .get ('items' , []):
239- request = Request .model_validate (request_data )
252+ request = _Request .model_validate (request_data )
240253
241- if request .unique_key in self ._requests_in_progress :
254+ if request .id in self ._requests_in_progress :
242255 # Ignore requests that are already in progress, we will not process them again.
243256 continue
244257 if request .was_already_handled :
245- # Do not cache fully handled requests, we do not need them. Just cache their unique_key .
246- self ._requests_already_handled .add (request .unique_key )
258+ # Do not cache fully handled requests, we do not need them. Just cache their id .
259+ self ._requests_already_handled .add (request .id )
247260 else :
248261 # Only fetch the request if we do not know it yet.
249- if request .unique_key not in self ._requests_cache :
250- request_id = unique_key_to_request_id (request .unique_key )
251- complete_request_data = await self ._api_client .get_request (request_id )
252-
253- if complete_request_data is not None :
254- request = Request .model_validate (complete_request_data )
255- self ._requests_cache [request .unique_key ] = request
256- else :
257- logger .warning (
258- f'Could not fetch request data for unique_key=`{ request .unique_key } ` (id=`{ request_id } `)'
259- )
262+ if request .id not in self ._requests_cache :
263+ complete_request_data = await self ._api_client .get_request (request_data ['id' ])
264+ request = _Request .model_validate (complete_request_data )
265+ self ._requests_cache [request .id ] = request
260266
261267 # Add new requests to the end of the head, unless already present in head
262- if request .unique_key not in self ._head_requests :
263- self ._head_requests .appendleft (request .unique_key )
268+ if request .id not in self ._head_requests :
269+ self ._head_requests .appendleft (request .id )
264270
265271 async def mark_request_as_handled (self , request : Request ) -> ProcessedRequest | None :
266272 """Mark a request as handled after successful processing.
@@ -275,12 +281,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
275281 """
276282 # Set the handled_at timestamp if not already set
277283
284+ _request = _Request .model_validate (request .model_dump ())
285+
278286 if request .handled_at is None :
279287 request .handled_at = datetime .now (tz = timezone .utc )
280288 self .metadata .handled_request_count += 1
281289 self .metadata .pending_request_count -= 1
282290
283- if cached_request := self ._requests_cache .get (request . unique_key ):
291+ if cached_request := self ._requests_cache .get (_request . id ):
284292 cached_request .handled_at = request .handled_at
285293
286294 try :
@@ -289,13 +297,13 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
289297 # adding to the queue.)
290298 processed_request = await self ._update_request (request )
291299 # Remember that we handled this request, to optimize local deduplication.
292- self ._requests_already_handled .add (request . unique_key )
300+ self ._requests_already_handled .add (_request . id )
293301 # Remove request from cache. It will most likely not be needed.
294- self ._requests_cache .pop (request . unique_key )
295- self ._requests_in_progress .discard (request . unique_key )
302+ self ._requests_cache .pop (_request . id )
303+ self ._requests_in_progress .discard (_request . id )
296304
297305 except Exception as exc :
298- logger .debug (f'Error marking request { request .unique_key } as handled: { exc !s} ' )
306+ logger .debug (f'Error marking request { _request .unique_key } as handled: { exc !s} ' )
299307 return None
300308 else :
301309 return processed_request
@@ -319,24 +327,28 @@ async def reclaim_request(
319327 """
320328 # Check if the request was marked as handled and clear it. When reclaiming,
321329 # we want to put the request back for processing.
330+
331+ _request = _Request .model_validate (request .model_dump ())
332+
322333 if request .was_already_handled :
323334 request .handled_at = None
324335
325336 try :
326337 # Make sure request is in the local cache. We might need it.
327- self ._requests_cache [request . unique_key ] = request
338+ self ._requests_cache [_request . id ] = request
328339
329340 # No longer in progress
330- self ._requests_in_progress .discard (request . unique_key )
341+ self ._requests_in_progress .discard (_request . id )
331342 # No longer handled
332- self ._requests_already_handled .discard (request . unique_key )
343+ self ._requests_already_handled .discard (_request . id )
333344
334345 if forefront :
335346 # Append to top of the local head estimation
336- self ._head_requests .append (request . unique_key )
347+ self ._head_requests .append (_request . id )
337348
338- processed_request = await self ._update_request (request , forefront = forefront )
339- processed_request .unique_key = request .unique_key
349+ processed_request = await self ._update_request (_request , forefront = forefront )
350+ processed_request .id = _request .id
351+ processed_request .unique_key = _request .unique_key
340352 # If the request was previously handled, decrement our handled count since
341353 # we're putting it back for processing.
342354 if request .was_already_handled and not processed_request .was_already_handled :
@@ -374,10 +386,9 @@ async def _update_request(
374386 Returns:
375387 The updated request
376388 """
377- request_dict = request .model_dump (by_alias = True )
378- request_dict ['id' ] = unique_key_to_request_id (request .unique_key )
389+ _request = _Request .model_validate (request .model_dump (by_alias = True ))
379390 response = await self ._api_client .update_request (
380- request = request_dict ,
391+ request = _request . model_dump ( by_alias = True ) ,
381392 forefront = forefront ,
382393 )
383394
@@ -396,10 +407,10 @@ async def _init_caches(self) -> None:
396407 """
397408 response = await self ._api_client .list_requests (limit = 10_000 )
398409 for request_data in response .get ('items' , []):
399- request = Request .model_validate (request_data )
410+ request = _Request .model_validate (request_data )
400411 if request .was_already_handled :
401- # Cache just unique_key for deduplication
402- self ._requests_already_handled .add (request .unique_key )
412+ # Cache just id for deduplication
413+ self ._requests_already_handled .add (request .id )
403414 else :
404415 # Cache full request
405- self ._requests_cache [request .unique_key ] = request
416+ self ._requests_cache [request .id ] = request
0 commit comments