@@ -23,7 +23,7 @@ use zenoh::{
2323 ConsolidationMode , Parameters , Selector , TimeBound , TimeExpr , TimeRange , ZenohParameters ,
2424 } ,
2525 sample:: { Locality , Sample , SampleKind , SourceSn } ,
26- session:: { EntityGlobalId , EntityId } ,
26+ session:: { ClosingCallbackId , EntityGlobalId , EntityId } ,
2727 Resolvable , Resolve , Session , Wait , KE_ADV_PREFIX , KE_EMPTY , KE_PUB , KE_STAR , KE_STARSTAR ,
2828 KE_SUB ,
2929} ;
@@ -409,6 +409,15 @@ struct State {
409409 callback : Callback < Sample > ,
410410 miss_handlers : HashMap < usize , Callback < Miss > > ,
411411 token : Option < LivelinessToken > ,
412+ closing_callback_id : Option < ClosingCallbackId > ,
413+ }
414+
415+ impl Drop for State {
416+ fn drop ( & mut self ) {
417+ if let Some ( id) = self . closing_callback_id . take ( ) {
418+ self . session . unregister_closing_callback ( id) ;
419+ }
420+ }
412421}
413422
414423#[ zenoh_macros:: unstable]
@@ -683,8 +692,21 @@ impl<Handler> AdvancedSubscriber<Handler> {
683692 callback : callback. clone ( ) ,
684693 miss_handlers : HashMap :: new ( ) ,
685694 token : None ,
695+ closing_callback_id : None ,
686696 } ) ) ;
687697
698+ let closing_callback_id = conf
699+ . session
700+ . register_closing_callback ( {
701+ let statesref = statesref. clone ( ) ;
702+ move || {
703+ let mut state = zlock ! ( statesref) ;
704+ state. callback = Callback :: new ( Arc :: new ( |_| ( ) ) ) ;
705+ }
706+ } )
707+ . map_err ( |_| "session closed" ) ?;
708+ zlock ! ( statesref) . closing_callback_id = Some ( closing_callback_id) ;
709+
688710 let sub_callback = {
689711 let statesref = statesref. clone ( ) ;
690712 let session = conf. session . clone ( ) ;
0 commit comments