Skip to content

Commit 8293dc3

Browse files
Made kombu client manager more robust and efficient
1 parent cd7f781 commit 8293dc3

File tree

2 files changed

+37
-23
lines changed

2 files changed

+37
-23
lines changed

src/socketio/asyncio_aiopika_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async def _publish(self, data):
8686
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
8787
), routing_key='*',
8888
)
89-
return
89+
break
9090
except aio_pika.AMQPException:
9191
if retry:
9292
self._get_logger().error('Cannot publish to rabbitmq... '

src/socketio/kombu_manager.py

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pickle
2+
import time
23
import uuid
34

45
try:
@@ -61,7 +62,7 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
6162
self.exchange_options = exchange_options or {}
6263
self.queue_options = queue_options or {}
6364
self.producer_options = producer_options or {}
64-
self.producer = self._producer()
65+
self.publisher_connection = self._connection()
6566

6667
def initialize(self):
6768
super(KombuManager, self).initialize()
@@ -92,31 +93,44 @@ def _queue(self):
9293
options.update(self.queue_options)
9394
return kombu.Queue(queue_name, self._exchange(), **options)
9495

95-
def _producer(self):
96-
return self._connection().Producer(exchange=self._exchange(),
97-
**self.producer_options)
98-
99-
def __error_callback(self, exception, interval):
100-
self._get_logger().exception('Sleeping {}s'.format(interval))
96+
def _producer_publish(self, connection):
97+
producer = connection.Producer(exchange=self._exchange(),
98+
**self.producer_options)
99+
return connection.ensure(producer, producer.publish)
101100

102101
def _publish(self, data):
103-
connection = self._connection()
104-
publish = connection.ensure(self.producer, self.producer.publish,
105-
errback=self.__error_callback)
106-
publish(pickle.dumps(data))
102+
retry = True
103+
while True:
104+
try:
105+
producer_publish = self._producer_publish(
106+
self.publisher_connection)
107+
producer_publish(pickle.dumps(data))
108+
break
109+
except (OSError, kombu.exceptions.KombuError):
110+
if retry:
111+
self._get_logger().error('Cannot publish to rabbitmq... '
112+
'retrying')
113+
retry = False
114+
else:
115+
self._get_logger().error(
116+
'Cannot publish to rabbitmq... giving up')
117+
break
107118

108119
def _listen(self):
109120
reader_queue = self._queue()
110-
121+
retry_sleep = 1
111122
while True:
112-
connection = self._connection().ensure_connection(
113-
errback=self.__error_callback)
114123
try:
115-
with connection.SimpleQueue(reader_queue) as queue:
116-
while True:
117-
message = queue.get(block=True)
118-
message.ack()
119-
yield message.payload
120-
except connection.connection_errors:
121-
self._get_logger().exception("Connection error "
122-
"while reading from queue")
124+
with self._connection() as connection:
125+
with connection.SimpleQueue(reader_queue) as queue:
126+
while True:
127+
message = queue.get(block=True)
128+
message.ack()
129+
yield message.payload
130+
retry_sleep = 1
131+
except (OSError, kombu.exceptions.KombuError):
132+
self._get_logger().error(
133+
'Cannot receive from rabbitmq... '
134+
'retrying in {} secs'.format(retry_sleep))
135+
time.sleep(retry_sleep)
136+
retry_sleep = min(retry_sleep * 2, 60)

0 commit comments

Comments
 (0)