@@ -55,7 +55,7 @@ def __init__(self):
5555 self .queue = asyncio .Queue (maxsize = settings .PROXY_QUEUE_SIZE )
5656
5757 async def exec (self ):
58- await asyncio .wait ( [
58+ await asyncio .gather ( * [
5959 self .producer (),
6060 self .consumer (),
6161 ])
@@ -74,7 +74,7 @@ async def consumer(self):
7474 i += 1
7575
7676 if tasks :
77- await asyncio .wait ( tasks )
77+ await asyncio .gather ( * tasks )
7878 tasks .clear ()
7979 except KeyboardInterrupt :
8080 raise
@@ -91,7 +91,7 @@ async def producer(self):
9191 collector_states = await db .execute (
9292 CollectorState .select ().where (
9393 CollectorState .last_processing_time < time .time () - CollectorState .processing_period
94- )
94+ ). limit ( settings . CONCURRENT_TASKS_COUNT )
9595 )
9696
9797 tasks = [
@@ -100,7 +100,7 @@ async def producer(self):
100100 ]
101101
102102 if tasks :
103- await asyncio .wait ( tasks )
103+ await asyncio .gather ( * tasks )
104104 tasks .clear ()
105105
106106 # check proxies
@@ -193,11 +193,11 @@ async def process_raw_proxies(self, proxies, collector_id):
193193 for proxy in proxies :
194194 tasks .append (self .process_raw_proxy (proxy , collector_id ))
195195 if len (tasks ) > settings .CONCURRENT_TASKS_COUNT :
196- await asyncio .wait ( tasks )
196+ await asyncio .gather ( * tasks )
197197 tasks .clear ()
198198
199199 if tasks :
200- await asyncio .wait ( tasks )
200+ await asyncio .gather ( * tasks )
201201
202202 async def process_raw_proxy (self , proxy , collector_id ):
203203 self .logger .debug ("adding raw proxy \" {}\" to queue" .format (proxy ))
0 commit comments