1515//
1616//
1717
18- use std :: collections :: HashMap ;
18+ use multimap :: MultiMap ;
1919use std:: time:: Duration ;
2020
2121use tokio:: sync:: { broadcast, mpsc} ;
@@ -55,8 +55,6 @@ pub struct ListenerManagerConfig {
5555#[ derive( Debug , Clone ) ]
5656pub enum CleanupPolicy {
5757 CountBasedOnly ( usize ) ,
58- TimeBasedOnly ( Duration ) ,
59- Hybrid { timeout : Duration , max_count : usize } ,
6058}
6159
6260impl Default for ListenerManagerConfig {
@@ -92,6 +90,13 @@ impl ListenersManager {
9290 pub fn new (
9391 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
9492 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
93+ ) -> Self {
94+ Self :: with_config ( listener_configuration_channel, route_configuration_channel, ListenerManagerConfig :: default ( ) )
95+ }
96+
97+ pub fn with_config (
98+ listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
99+ route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
95100 config : ListenerManagerConfig ,
96101 ) -> Self {
97102 ListenersManager {
@@ -173,7 +178,6 @@ impl ListenersManager {
173178
174179 let listener_info = ListenerInfo :: new ( join_handle, listener_conf, version) ;
175180 self . listener_handles . insert ( listener_name. clone ( ) , listener_info) ;
176-
177181 let version_count = self . listener_handles . get_vec ( & listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
178182 info ! ( "Started version {} of listener {} ({} total active version(s))" , version, listener_name, version_count) ;
179183
@@ -197,48 +201,33 @@ impl ListenersManager {
197201 }
198202
199203 fn cleanup_old_versions ( & mut self , listener_name : & str ) {
200- if let Some ( versions) = self . listener_handles . get_mut ( listener_name) {
204+ if let Some ( mut versions) = self . listener_handles . remove ( listener_name) {
201205 let original_count = versions. len ( ) ;
202206
203207 match & self . config . cleanup_policy {
204208 CleanupPolicy :: CountBasedOnly ( max_count) => {
205209 if versions. len ( ) > * max_count {
206210 let to_remove = versions. len ( ) - max_count;
207- for _ in 0 ..to_remove {
208- let old = versions . remove ( 0 ) ;
211+ let removed = versions . drain ( 0 ..to_remove) . collect :: < Vec < _ > > ( ) ;
212+ for old in removed {
209213 info ! ( "Cleaning up old listener {} version {} (count limit)" , listener_name, old. version) ;
210214 }
211215 }
212216 } ,
213- CleanupPolicy :: TimeBasedOnly ( _timeout) => {
214- // TODO: Implement time-based cleanup when we have connection tracking
215- // For now, behave like count-based with default limit
216- if versions. len ( ) > self . config . max_versions_per_listener {
217- let to_remove = versions. len ( ) - self . config . max_versions_per_listener ;
218- for _ in 0 ..to_remove {
219- let old = versions. remove ( 0 ) ;
220- info ! ( "Cleaning up old listener {} version {} (time limit)" , listener_name, old. version) ;
221- }
222- }
223- } ,
224- CleanupPolicy :: Hybrid { max_count, .. } => {
225- if versions. len ( ) > * max_count {
226- let to_remove = versions. len ( ) - max_count;
227- for _ in 0 ..to_remove {
228- let old = versions. remove ( 0 ) ;
229- info ! ( "Cleaning up old listener {} version {} (hybrid limit)" , listener_name, old. version) ;
230- }
231- }
232- } ,
233217 }
234218
235- let cleaned_count = original_count - versions. len ( ) ;
236- if cleaned_count > 0 {
219+ // Re-insert the remaining versions
220+ for version in versions {
221+ self . listener_handles . insert ( listener_name. to_string ( ) , version) ;
222+ }
223+
224+ let remaining_count = self . listener_handles . get_vec ( listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
225+ if original_count != remaining_count {
237226 info ! (
238- "Cleaned up {} old versions of listener {}, {} versions remaining" ,
239- cleaned_count ,
227+ "Cleaned up {} old version(s) of listener {}, {} remaining" ,
228+ original_count - remaining_count ,
240229 listener_name,
241- versions . len ( )
230+ remaining_count
242231 ) ;
243232 }
244233 }
@@ -277,8 +266,7 @@ mod tests {
277266
278267 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
279268 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
280- let config = ListenerManagerConfig :: default ( ) ;
281- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
269+ let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
282270
283271 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
284272 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -324,8 +312,7 @@ mod tests {
324312
325313 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
326314 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
327- let config = ListenerManagerConfig :: default ( ) ;
328- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
315+ let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
329316
330317 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
331318 let ( secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -375,7 +362,7 @@ mod tests {
375362 cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
376363 cleanup_interval : Duration :: from_secs ( 60 ) ,
377364 } ;
378- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
365+ let mut man = ListenersManager :: with_config ( conf_rx, route_rx, config) ;
379366
380367 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
381368 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -408,7 +395,7 @@ mod tests {
408395 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
409396
410397 // Should only have 2 versions due to cleanup policy (max_count: 2)
411- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 2 ) ;
398+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 2 ) ;
412399
413400 man. stop_listener ( name) . unwrap ( ) ;
414401
@@ -430,7 +417,7 @@ mod tests {
430417 cleanup_policy : CleanupPolicy :: CountBasedOnly ( 3 ) ,
431418 cleanup_interval : Duration :: from_secs ( 60 ) ,
432419 } ;
433- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
420+ let mut man = ListenersManager :: with_config ( conf_rx, route_rx, config) ;
434421
435422 // Add 5 listeners, should only keep 3 due to cleanup policy
436423 for i in 1 ..=5 {
@@ -443,10 +430,10 @@ mod tests {
443430 }
444431
445432 // Should only have 3 versions due to cleanup policy
446- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 3 ) ;
433+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 3 ) ;
447434
448435 man. stop_listener ( name) . unwrap ( ) ;
449- assert ! ( man. listener_handles. get ( name) . is_none( ) ) ;
436+ assert ! ( man. listener_handles. get_vec ( name) . is_none( ) ) ;
450437
451438 tokio:: task:: yield_now ( ) . await ;
452439 }
0 commit comments