1515//
1616//
1717
18- use multimap:: MultiMap ;
18+ use std:: collections:: HashMap ;
19+ use std:: time:: Duration ;
20+
1921use tokio:: sync:: { broadcast, mpsc} ;
2022use tracing:: { info, warn} ;
2123
@@ -43,6 +45,30 @@ pub enum TlsContextChange {
4345 Updated ( ( String , TransportSecret ) ) ,
4446}
4547
48+ #[ derive( Debug , Clone ) ]
49+ pub struct ListenerManagerConfig {
50+ pub max_versions_per_listener : usize ,
51+ pub cleanup_policy : CleanupPolicy ,
52+ pub cleanup_interval : Duration ,
53+ }
54+
55+ #[ derive( Debug , Clone ) ]
56+ pub enum CleanupPolicy {
57+ CountBasedOnly ( usize ) ,
58+ TimeBasedOnly ( Duration ) ,
59+ Hybrid { timeout : Duration , max_count : usize } ,
60+ }
61+
62+ impl Default for ListenerManagerConfig {
63+ fn default ( ) -> Self {
64+ Self {
65+ max_versions_per_listener : 2 ,
66+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
67+ cleanup_interval : Duration :: from_secs ( 60 ) ,
68+ }
69+ }
70+ }
71+
4672struct ListenerInfo {
4773 handle : abort_on_drop:: ChildTask < ( ) > ,
4874 listener_conf : ListenerConfig ,
@@ -59,18 +85,21 @@ pub struct ListenersManager {
5985 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
6086 listener_handles : MultiMap < String , ListenerInfo > ,
6187 version_counter : u64 ,
88+ config : ListenerManagerConfig ,
6289}
6390
6491impl ListenersManager {
6592 pub fn new (
6693 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
6794 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
95+ config : ListenerManagerConfig ,
6896 ) -> Self {
6997 ListenersManager {
7098 listener_configuration_channel,
7199 route_configuration_channel,
72100 listener_handles : MultiMap :: new ( ) ,
73101 version_counter : 0 ,
102+ config,
74103 }
75104 }
76105
@@ -148,6 +177,8 @@ impl ListenersManager {
148177 let version_count = self . listener_handles . get_vec ( & listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
149178 info ! ( "Started version {} of listener {} ({} total active version(s))" , version, listener_name, version_count) ;
150179
180+ self . cleanup_old_versions ( & listener_name) ;
181+
151182 Ok ( ( ) )
152183 }
153184
@@ -164,6 +195,54 @@ impl ListenersManager {
164195
165196 Ok ( ( ) )
166197 }
198+
199+ fn cleanup_old_versions ( & mut self , listener_name : & str ) {
200+ if let Some ( versions) = self . listener_handles . get_mut ( listener_name) {
201+ let original_count = versions. len ( ) ;
202+
203+ match & self . config . cleanup_policy {
204+ CleanupPolicy :: CountBasedOnly ( max_count) => {
205+ if versions. len ( ) > * max_count {
206+ let to_remove = versions. len ( ) - max_count;
207+ for _ in 0 ..to_remove {
208+ let old = versions. remove ( 0 ) ;
209+ info ! ( "Cleaning up old listener {} version {} (count limit)" , listener_name, old. version) ;
210+ }
211+ }
212+ } ,
213+ CleanupPolicy :: TimeBasedOnly ( _timeout) => {
214+ // TODO: Implement time-based cleanup when we have connection tracking
215+ // For now, behave like count-based with default limit
216+ if versions. len ( ) > self . config . max_versions_per_listener {
217+ let to_remove = versions. len ( ) - self . config . max_versions_per_listener ;
218+ for _ in 0 ..to_remove {
219+ let old = versions. remove ( 0 ) ;
220+ info ! ( "Cleaning up old listener {} version {} (time limit)" , listener_name, old. version) ;
221+ }
222+ }
223+ } ,
224+ CleanupPolicy :: Hybrid { max_count, .. } => {
225+ if versions. len ( ) > * max_count {
226+ let to_remove = versions. len ( ) - max_count;
227+ for _ in 0 ..to_remove {
228+ let old = versions. remove ( 0 ) ;
229+ info ! ( "Cleaning up old listener {} version {} (hybrid limit)" , listener_name, old. version) ;
230+ }
231+ }
232+ } ,
233+ }
234+
235+ let cleaned_count = original_count - versions. len ( ) ;
236+ if cleaned_count > 0 {
237+ info ! (
238+ "Cleaned up {} old versions of listener {}, {} versions remaining" ,
239+ cleaned_count,
240+ listener_name,
241+ versions. len( )
242+ ) ;
243+ }
244+ }
245+ }
167246}
168247
169248#[ cfg( test) ]
@@ -198,7 +277,8 @@ mod tests {
198277
199278 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
200279 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
201- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
280+ let config = ListenerManagerConfig :: default ( ) ;
281+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
202282
203283 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
204284 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -244,7 +324,8 @@ mod tests {
244324
245325 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
246326 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
247- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
327+ let config = ListenerManagerConfig :: default ( ) ;
328+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
248329
249330 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
250331 let ( secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -289,7 +370,12 @@ mod tests {
289370
290371 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
291372 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
292- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
373+ let config = ListenerManagerConfig {
374+ max_versions_per_listener : 2 ,
375+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
376+ cleanup_interval : Duration :: from_secs ( 60 ) ,
377+ } ;
378+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
293379
294380 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
295381 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -315,16 +401,53 @@ mod tests {
315401 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
316402 tokio:: task:: yield_now ( ) . await ;
317403
318- assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
404+ // After adding 3rd listener, first should be cleaned up due to max_versions_per_listener = 2
405+ // So routeb_tx1 should be closed (is_err), but routeb_tx2 and routeb_tx3 should work
406+ assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_err( ) ) ;
319407 assert ! ( routeb_tx2. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
320408 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
321409
322- assert_eq ! ( man. listener_handles. get_vec( name) . unwrap( ) . len( ) , 3 ) ;
410+ // Should only have 2 versions due to cleanup policy (max_count: 2)
411+ assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 2 ) ;
323412
324413 man. stop_listener ( name) . unwrap ( ) ;
325414
326415 assert ! ( man. listener_handles. get_vec( name) . is_none( ) ) ;
327416
328417 tokio:: task:: yield_now ( ) . await ;
329418 }
419+
420+ #[ traced_test]
421+ #[ tokio:: test]
422+ async fn test_cleanup_policy_enforcement ( ) {
423+ let chan = 10 ;
424+ let name = "cleanup-test-listener" ;
425+
426+ let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
427+ let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
428+ let config = ListenerManagerConfig {
429+ max_versions_per_listener : 3 ,
430+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 3 ) ,
431+ cleanup_interval : Duration :: from_secs ( 60 ) ,
432+ } ;
433+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
434+
435+ // Add 5 listeners, should only keep 3 due to cleanup policy
436+ for i in 1 ..=5 {
437+ let ( _routeb_tx, routeb_rx) = broadcast:: channel ( chan) ;
438+ let ( _secb_tx, secb_rx) = broadcast:: channel ( chan) ;
439+ let listener = Listener :: test_listener ( name, routeb_rx, secb_rx) ;
440+ let listener_info = create_test_listener_config ( name, 1230 + i) ;
441+ man. start_listener ( listener, listener_info) . unwrap ( ) ;
442+ tokio:: task:: yield_now ( ) . await ;
443+ }
444+
445+ // Should only have 3 versions due to cleanup policy
446+ assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 3 ) ;
447+
448+ man. stop_listener ( name) . unwrap ( ) ;
449+ assert ! ( man. listener_handles. get( name) . is_none( ) ) ;
450+
451+ tokio:: task:: yield_now ( ) . await ;
452+ }
330453}
0 commit comments