Skip to content

Commit 019f532

Browse files
committed
Add unsubscribe support
1 parent 6414ca7 commit 019f532

File tree

1 file changed

+43
-0
lines changed

1 file changed

+43
-0
lines changed

fastapi_websocket_pubsub/pub_sub_client.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,49 @@ def subscribe(self, topic: Topic, callback: Coroutine):
280280
future.set_result(None)
281281
return future
282282

283+
def unsubscribe(self, topic: Topic):
284+
"""
285+
Unsubscribe for events
286+
287+
Args:
288+
topic (Topic): the identifier of the event topic to be unsubscribed.
289+
Note: You can use ALL_TOPICS (event_notifier.ALL_TOPICS) to unsubscribe all topics
290+
291+
Returns:
292+
Coroutine: awaitable task to subscribe to topic if connected.
293+
"""
294+
# Create none-future which can be safely awaited
295+
# but which also will not give warnings
296+
# if it isn't awaited. This is returned
297+
# on code paths which do not make RPC calls.
298+
none_future = asyncio.Future()
299+
none_future.set_result(None)
300+
301+
# Topics to potentially make RPC calls about
302+
topics = list(self._topics) if topic is ALL_TOPICS else [topic]
303+
304+
# Handle ALL_TOPICS or specific topics
305+
if topic is ALL_TOPICS and not self._topics:
306+
logger.warning(f"Cannot unsubscribe 'ALL_TOPICS'. No topics are subscribed.")
307+
return none_future
308+
elif topic is not ALL_TOPICS and topic not in self._topics:
309+
logger.warning(f"Cannot unsubscribe topic '{topic}' which is not subscribed.")
310+
return none_future
311+
elif topic is ALL_TOPICS and self._topics:
312+
logger.debug(f"Unsubscribing all topics: {self._topics}")
313+
# remove all topics and callbacks
314+
self._topics.clear()
315+
self._callbacks.clear()
316+
elif topic is not ALL_TOPICS and topic in self._topics:
317+
logger.debug(f"Unsubscribing topic '{topic}'")
318+
self._topics.remove(topic)
319+
self._callbacks.pop(topic, None)
320+
321+
if self.is_ready():
322+
return self._rpc_channel.other.unsubscribe(topics=topics)
323+
else:
324+
return none_future
325+
283326
async def publish(
284327
self, topics: TopicList, data=None, sync=True, notifier_id=None
285328
) -> bool:

0 commit comments

Comments
 (0)