@@ -280,6 +280,47 @@ def subscribe(self, topic: Topic, callback: Coroutine):
280
280
future .set_result (None )
281
281
return future
282
282
283
+ def unsubscribe (self , topic : Topic ):
284
+ """
285
+ Unsubscribe for events
286
+
287
+ Args:
288
+ topic (Topic): the identifier of the event topic to be unsubscribed.
289
+ Note: You can use ALL_TOPICS (event_notifier.ALL_TOPICS) to unsubscribe all topics
290
+
291
+ Returns:
292
+ Coroutine: awaitable task to subscribe to topic if connected.
293
+ """
294
+ # Create none-future which can be safely awaited
295
+ # but which also will not give warnings
296
+ # if it isn't awaited. This is returned
297
+ # on code paths which do not make RPC calls.
298
+ none_future = asyncio .Future ()
299
+ none_future .set_result (None )
300
+
301
+ # Handle unsubscribing ALL_TOPICS
302
+ if topic is ALL_TOPICS and not self ._topics :
303
+ logger .warning (f"Cannot unsubscribe 'ALL_TOPICS'. No topics are subscribed." )
304
+ return none_future
305
+ elif topic is not ALL_TOPICS and topic not in self ._topics :
306
+ logger .warning (f"Cannot unsubscribe topic '{ topic } ' which is not subscribed." )
307
+ return none_future
308
+ elif topic is ALL_TOPICS and self ._topics :
309
+ logger .debug (f"Unsubscribing all topics: { self ._topics } " )
310
+ # remove all topics and callbacks
311
+ self ._topics .clear ()
312
+ self ._callbacks .clear ()
313
+ elif topic is not ALL_TOPICS and topic in self ._topics :
314
+ logger .debug (f"Unsubscribing topic '{ topic } '" )
315
+ self ._topics .remove (topic )
316
+ self ._callbacks .pop (topic , None )
317
+
318
+ if self .is_ready ():
319
+ topics = list (self ._topics ) if topic is ALL_TOPICS else [topic ]
320
+ return self ._rpc_channel .other .unsubscribe (topics = topics )
321
+ else :
322
+ return none_future
323
+
283
324
async def publish (
284
325
self , topics : TopicList , data = None , sync = True , notifier_id = None
285
326
) -> bool :
0 commit comments