11from  __future__ import  annotations 
22
33import  asyncio 
4+ import  contextlib 
45from  typing  import  TYPE_CHECKING , Annotated 
56
67import  websockets .asyncio .client 
2930
3031@docs_group ('Event managers' ) 
3132class  ApifyEventManager (EventManager ):
32-     """A class  for managing Actor events . 
33+     """Event manager  for the Apify platform . 
3334
34-     You shouldn't use this class directly, 
35-     but instead use it via the `Actor.on()` and `Actor.off()` methods. 
36-     """ 
35+     This class extends Crawlee's `EventManager` to provide Apify-specific functionality, including websocket 
36+     connectivity to the Apify platform for receiving platform events. 
37+ 
38+     The event manager handles: 
39+     - Registration and emission of events and their listeners. 
40+     - Websocket connection to Apify platform events. 
41+     - Processing and validation of platform messages. 
42+     - Automatic event forwarding from the platform to local event listeners. 
3743
38-     _platform_events_websocket : websockets .asyncio .client .ClientConnection  |  None  =  None 
39-     _process_platform_messages_task : asyncio .Task  |  None  =  None 
40-     _send_system_info_interval_task : asyncio .Task  |  None  =  None 
41-     _connected_to_platform_websocket : asyncio .Future  =  asyncio .Future ()
44+     This class should not be used directly. Use the `Actor.on` and `Actor.off` methods to interact 
45+     with the event system. 
46+     """ 
4247
43-     def  __init__ (self , config : Configuration , ** kwargs : Unpack [EventManagerOptions ]) ->  None :
44-         """Create an instance of the EventManager . 
48+     def  __init__ (self , configuration : Configuration , ** kwargs : Unpack [EventManagerOptions ]) ->  None :
49+         """Initialize a new instance . 
4550
4651        Args: 
47-             config : The Actor configuration to be used in this  event manager. 
48-             kwargs: Event  manager options - forwarded  to the base  class 
52+             configuration : The Actor configuration for the  event manager. 
53+             ** kwargs: Additional event  manager options passed  to the parent  class.  
4954        """ 
5055        super ().__init__ (** kwargs )
5156
52-         self ._config  =  config 
53-         self ._listener_tasks  =  set ()
54-         self ._connected_to_platform_websocket  =  asyncio .Future [bool ]()
57+         self ._configuration  =  configuration 
58+         """The Actor configuration for the event manager.""" 
59+ 
60+         self ._platform_events_websocket : websockets .asyncio .client .ClientConnection  |  None  =  None 
61+         """WebSocket connection to the platform events.""" 
62+ 
63+         self ._process_platform_messages_task : asyncio .Task  |  None  =  None 
64+         """Task for processing messages from the platform websocket.""" 
65+ 
66+         self ._connected_to_platform_websocket : asyncio .Future [bool ] |  None  =  None 
67+         """Future that resolves when the connection to the platform websocket is established.""" 
5568
5669    @override  
5770    async  def  __aenter__ (self ) ->  Self :
5871        await  super ().__aenter__ ()
5972        self ._connected_to_platform_websocket  =  asyncio .Future ()
6073
6174        # Run tasks but don't await them 
62-         if  self ._config .actor_events_ws_url :
75+         if  self ._configuration .actor_events_ws_url :
6376            self ._process_platform_messages_task  =  asyncio .create_task (
64-                 self ._process_platform_messages (self ._config .actor_events_ws_url )
77+                 self ._process_platform_messages (self ._configuration .actor_events_ws_url )
6578            )
6679            is_connected  =  await  self ._connected_to_platform_websocket 
6780            if  not  is_connected :
@@ -81,16 +94,19 @@ async def __aexit__(
8194        if  self ._platform_events_websocket :
8295            await  self ._platform_events_websocket .close ()
8396
84-         if  self ._process_platform_messages_task :
85-             await  self ._process_platform_messages_task 
97+         if  self ._process_platform_messages_task  and  not  self ._process_platform_messages_task .done ():
98+             self ._process_platform_messages_task .cancel ()
99+             with  contextlib .suppress (asyncio .CancelledError ):
100+                 await  self ._process_platform_messages_task 
86101
87102        await  super ().__aexit__ (exc_type , exc_value , exc_traceback )
88103
89104    async  def  _process_platform_messages (self , ws_url : str ) ->  None :
90105        try :
91106            async  with  websockets .asyncio .client .connect (ws_url ) as  websocket :
92107                self ._platform_events_websocket  =  websocket 
93-                 self ._connected_to_platform_websocket .set_result (True )
108+                 if  self ._connected_to_platform_websocket  is  not   None :
109+                     self ._connected_to_platform_websocket .set_result (True )
94110
95111                async  for  message  in  websocket :
96112                    try :
@@ -110,7 +126,7 @@ async def _process_platform_messages(self, ws_url: str) -> None:
110126                            event = parsed_message .name ,
111127                            event_data = parsed_message .data 
112128                            if  not  isinstance (parsed_message .data , SystemInfoEventData )
113-                             else  parsed_message .data .to_crawlee_format (self ._config .dedicated_cpus  or  1 ),
129+                             else  parsed_message .data .to_crawlee_format (self ._configuration .dedicated_cpus  or  1 ),
114130                        )
115131
116132                        if  parsed_message .name  ==  Event .MIGRATING :
@@ -120,4 +136,5 @@ async def _process_platform_messages(self, ws_url: str) -> None:
120136                        logger .exception ('Cannot parse Actor event' , extra = {'message' : message })
121137        except  Exception :
122138            logger .exception ('Error in websocket connection' )
123-             self ._connected_to_platform_websocket .set_result (False )
139+             if  self ._connected_to_platform_websocket  is  not   None :
140+                 self ._connected_to_platform_websocket .set_result (False )
0 commit comments