Skip to content

Commit cd7f781

Browse files
Made aio_pika client manager more robust and efficient (Fixes #1142)
1 parent 270eb37 commit cd7f781

File tree

1 file changed

+59
-39
lines changed

1 file changed

+59
-39
lines changed

src/socketio/asyncio_aiopika_manager.py

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
4444
'(Run "pip install aio_pika" in your '
4545
'virtualenv).')
4646
self.url = url
47-
self.listener_connection = None
48-
self.listener_channel = None
49-
self.listener_queue = None
47+
self._lock = asyncio.Lock()
48+
self.publisher_connection = None
49+
self.publisher_channel = None
50+
self.publisher_exchange = None
5051
super().__init__(channel=channel, write_only=write_only, logger=logger)
5152

5253
async def _connection(self):
@@ -66,41 +67,60 @@ async def _queue(self, channel, exchange):
6667
return queue
6768

6869
async def _publish(self, data):
69-
connection = await self._connection()
70-
channel = await self._channel(connection)
71-
exchange = await self._exchange(channel)
72-
await exchange.publish(
73-
aio_pika.Message(body=pickle.dumps(data),
74-
delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
75-
routing_key='*'
76-
)
77-
78-
async def _listen(self):
79-
retry_sleep = 1
80-
while True:
81-
try:
82-
if self.listener_connection is None:
83-
self.listener_connection = await self._connection()
84-
self.listener_channel = await self._channel(
85-
self.listener_connection
70+
if self.publisher_connection is None:
71+
async with self._lock:
72+
if self.publisher_connection is None:
73+
self.publisher_connection = await self._connection()
74+
self.publisher_channel = await self._channel(
75+
self.publisher_connection
8676
)
87-
await self.listener_channel.set_qos(prefetch_count=1)
88-
exchange = await self._exchange(self.listener_channel)
89-
self.listener_queue = await self._queue(
90-
self.listener_channel, exchange
77+
self.publisher_exchange = await self._exchange(
78+
self.publisher_channel
9179
)
92-
retry_sleep = 1
93-
94-
async with self.listener_queue.iterator() as queue_iter:
95-
async for message in queue_iter:
96-
async with message.process():
97-
yield pickle.loads(message.body)
98-
except Exception:
99-
self._get_logger().error('Cannot receive from rabbitmq... '
100-
'retrying in '
101-
'{} secs'.format(retry_sleep))
102-
self.listener_connection = None
103-
await asyncio.sleep(retry_sleep)
104-
retry_sleep *= 2
105-
if retry_sleep > 60:
106-
retry_sleep = 60
80+
retry = True
81+
while True:
82+
try:
83+
await self.publisher_exchange.publish(
84+
aio_pika.Message(
85+
body=pickle.dumps(data),
86+
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
87+
), routing_key='*',
88+
)
89+
return
90+
except aio_pika.AMQPException:
91+
if retry:
92+
self._get_logger().error('Cannot publish to rabbitmq... '
93+
'retrying')
94+
retry = False
95+
else:
96+
self._get_logger().error(
97+
'Cannot publish to rabbitmq... giving up')
98+
break
99+
except aio_pika.exceptions.ChannelInvalidStateError:
100+
# aio_pika raises this exception when the task is cancelled
101+
raise asyncio.CancelledError()
102+
103+
async def _listen(self):
104+
async with (await self._connection()) as connection:
105+
channel = await self._channel(connection)
106+
await channel.set_qos(prefetch_count=1)
107+
exchange = await self._exchange(channel)
108+
queue = await self._queue(channel, exchange)
109+
110+
retry_sleep = 1
111+
while True:
112+
try:
113+
async with queue.iterator() as queue_iter:
114+
async for message in queue_iter:
115+
async with message.process():
116+
yield pickle.loads(message.body)
117+
retry_sleep = 1
118+
except aio_pika.AMQPException:
119+
self._get_logger().error(
120+
'Cannot receive from rabbitmq... '
121+
'retrying in {} secs'.format(retry_sleep))
122+
await asyncio.sleep(retry_sleep)
123+
retry_sleep = min(retry_sleep * 2, 60)
124+
except aio_pika.exceptions.ChannelInvalidStateError:
125+
# aio_pika raises this exception when the task is cancelled
126+
raise asyncio.CancelledError()

0 commit comments

Comments
 (0)