Skip to content

Commit 12134bd

Browse files
Prevent pubsub managers from ever crashing
Fixes #1262
1 parent 2f07824 commit 12134bd

File tree

3 files changed

+49
-41
lines changed

3 files changed

+49
-41
lines changed

src/socketio/async_pubsub_manager.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,13 @@ async def _thread(self):
230230
await self._handle_close_room(data)
231231
except asyncio.CancelledError:
232232
raise # let the outer try/except handle it
233-
except:
233+
except Exception:
234234
self.server.logger.exception(
235-
'Unknown error in pubsub listening task')
235+
'Handler error in pubsub listening thread')
236+
self.server.logger.error('pubsub listen() exited unexpectedly')
237+
break # loop should never exit except in unit tests!
236238
except asyncio.CancelledError: # pragma: no cover
237239
break
238-
except: # pragma: no cover
239-
import traceback
240-
traceback.print_exc()
240+
except Exception: # pragma: no cover
241+
self.server.logger.exception('Unexpected Error in pubsub '
242+
'listening thread')

src/socketio/pubsub_manager.py

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -188,38 +188,45 @@ def _handle_close_room(self, message):
188188
namespace=message.get('namespace'))
189189

190190
def _thread(self):
191-
for message in self._listen():
192-
data = None
193-
if isinstance(message, dict):
194-
data = message
195-
else:
196-
if isinstance(message, bytes): # pragma: no cover
197-
try:
198-
data = pickle.loads(message)
199-
except:
200-
pass
201-
if data is None:
202-
try:
203-
data = json.loads(message)
204-
except:
205-
pass
206-
if data and 'method' in data:
207-
self._get_logger().debug('pubsub message: {}'.format(
208-
data['method']))
209-
try:
210-
if data['method'] == 'callback':
211-
self._handle_callback(data)
212-
elif data.get('host_id') != self.host_id:
213-
if data['method'] == 'emit':
214-
self._handle_emit(data)
215-
elif data['method'] == 'disconnect':
216-
self._handle_disconnect(data)
217-
elif data['method'] == 'enter_room':
218-
self._handle_enter_room(data)
219-
elif data['method'] == 'leave_room':
220-
self._handle_leave_room(data)
221-
elif data['method'] == 'close_room':
222-
self._handle_close_room(data)
223-
except:
224-
self.server.logger.exception(
225-
'Unknown error in pubsub listening thread')
191+
while True:
192+
try:
193+
for message in self._listen():
194+
data = None
195+
if isinstance(message, dict):
196+
data = message
197+
else:
198+
if isinstance(message, bytes): # pragma: no cover
199+
try:
200+
data = pickle.loads(message)
201+
except:
202+
pass
203+
if data is None:
204+
try:
205+
data = json.loads(message)
206+
except:
207+
pass
208+
if data and 'method' in data:
209+
self._get_logger().debug('pubsub message: {}'.format(
210+
data['method']))
211+
try:
212+
if data['method'] == 'callback':
213+
self._handle_callback(data)
214+
elif data.get('host_id') != self.host_id:
215+
if data['method'] == 'emit':
216+
self._handle_emit(data)
217+
elif data['method'] == 'disconnect':
218+
self._handle_disconnect(data)
219+
elif data['method'] == 'enter_room':
220+
self._handle_enter_room(data)
221+
elif data['method'] == 'leave_room':
222+
self._handle_leave_room(data)
223+
elif data['method'] == 'close_room':
224+
self._handle_close_room(data)
225+
except Exception:
226+
self.server.logger.exception(
227+
'Handler error in pubsub listening thread')
228+
self.server.logger.error('pubsub listen() exited unexpectedly')
229+
break # loop should never exit except in unit tests!
230+
except Exception: # pragma: no cover
231+
self.server.logger.exception('Unexpected Error in pubsub '
232+
'listening thread')

tests/async/test_pubsub_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,6 @@ async def messages():
541541
'host_id': host_id}
542542
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
543543
'host_id': host_id})
544-
raise asyncio.CancelledError() # force the thread to exit
545544

546545
self.pm._listen = messages
547546
_run(self.pm._thread())

0 commit comments

Comments
 (0)