1616//
1717
1818use crate :: { Error , Result } ;
19- use orion_configuration:: config:: listener:: DrainType as ConfigDrainType ;
19+ use orion_configuration:: config:: listener:: { DrainType as ConfigDrainType , FilterChain , MainFilter } ;
2020use pingora_timeout:: fast_timeout:: fast_timeout;
2121use std:: collections:: HashMap ;
2222use std:: sync:: Arc ;
@@ -25,6 +25,13 @@ use tokio::sync::RwLock;
2525use tokio:: time:: sleep;
2626use tracing:: { debug, info, warn} ;
2727
28+ #[ derive( Debug , Clone ) ]
29+ pub enum ListenerProtocolConfig {
30+ Http { drain_timeout : Option < Duration > } ,
31+ Tcp ,
32+ Mixed { http_drain_timeout : Option < Duration > , has_tcp : bool , has_http : bool } ,
33+ }
34+
2835#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
2936pub enum DrainScenario {
3037 HealthCheckFail ,
@@ -46,8 +53,8 @@ impl DrainScenario {
4653pub enum DrainStrategy {
4754 Tcp { global_timeout : Duration } ,
4855 Http { global_timeout : Duration , drain_timeout : Duration } ,
56+ Mixed { global_timeout : Duration , http_drain_timeout : Duration , tcp_connections : bool , http_connections : bool } ,
4957 Immediate ,
50- Gradual ,
5158}
5259
5360#[ derive( Debug , Clone ) ]
@@ -102,9 +109,10 @@ impl ListenerDrainContext {
102109
103110 pub fn is_timeout_exceeded ( & self ) -> bool {
104111 let global_timeout = match & self . strategy {
105- DrainStrategy :: Tcp { global_timeout } | DrainStrategy :: Http { global_timeout, .. } => * global_timeout,
112+ DrainStrategy :: Tcp { global_timeout }
113+ | DrainStrategy :: Http { global_timeout, .. }
114+ | DrainStrategy :: Mixed { global_timeout, .. } => * global_timeout,
106115 DrainStrategy :: Immediate => Duration :: from_secs ( 0 ) ,
107- DrainStrategy :: Gradual => Duration :: from_secs ( 600 ) ,
108116 } ;
109117
110118 self . drain_start . elapsed ( ) >= global_timeout
@@ -113,6 +121,7 @@ impl ListenerDrainContext {
113121 pub fn get_http_drain_timeout ( & self ) -> Option < Duration > {
114122 match & self . strategy {
115123 DrainStrategy :: Http { drain_timeout, .. } => Some ( * drain_timeout) ,
124+ DrainStrategy :: Mixed { http_drain_timeout, .. } => Some ( * http_drain_timeout) ,
116125 _ => None ,
117126 }
118127 }
@@ -126,12 +135,30 @@ pub struct DrainSignalingManager {
126135 listener_drain_state : Arc < RwLock < Option < ListenerDrainState > > > ,
127136}
128137
138+ impl ListenerProtocolConfig {
139+ pub fn from_listener_analysis (
140+ has_http_connection_manager : bool ,
141+ has_tcp_proxy : bool ,
142+ http_drain_timeout : Option < Duration > ,
143+ ) -> Self {
144+ match ( has_http_connection_manager, has_tcp_proxy) {
145+ ( true , true ) => Self :: Mixed { http_drain_timeout, has_tcp : true , has_http : true } ,
146+ ( true , false ) => Self :: Http { drain_timeout : http_drain_timeout } ,
147+ ( false , true ) => Self :: Tcp ,
148+ ( false , false ) => {
149+ warn ! ( "No HTTP connection manager or TCP proxy found in listener, defaulting to TCP draining" ) ;
150+ Self :: Tcp
151+ } ,
152+ }
153+ }
154+ }
155+
129156impl DrainSignalingManager {
130157 pub fn new ( ) -> Self {
131158 Self {
132159 drain_contexts : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
133160 global_drain_timeout : Duration :: from_secs ( 600 ) ,
134- default_http_drain_timeout : Duration :: from_millis ( 5000 ) ,
161+ default_http_drain_timeout : Duration :: from_secs ( 5 ) ,
135162 listener_drain_state : Arc :: new ( RwLock :: new ( None ) ) ,
136163 }
137164 }
@@ -207,17 +234,21 @@ impl DrainSignalingManager {
207234 pub async fn initiate_listener_drain (
208235 & self ,
209236 listener_id : String ,
210- is_http : bool ,
211- http_drain_timeout : Option < Duration > ,
237+ protocol_config : ListenerProtocolConfig ,
212238 active_connections : usize ,
213239 ) -> Result < Arc < ListenerDrainContext > > {
214- let strategy = if is_http {
215- DrainStrategy :: Http {
240+ let strategy = match protocol_config {
241+ ListenerProtocolConfig :: Http { drain_timeout } => DrainStrategy :: Http {
216242 global_timeout : self . global_drain_timeout ,
217- drain_timeout : http_drain_timeout. unwrap_or ( self . default_http_drain_timeout ) ,
218- }
219- } else {
220- DrainStrategy :: Tcp { global_timeout : self . global_drain_timeout }
243+ drain_timeout : drain_timeout. unwrap_or ( self . default_http_drain_timeout ) ,
244+ } ,
245+ ListenerProtocolConfig :: Tcp => DrainStrategy :: Tcp { global_timeout : self . global_drain_timeout } ,
246+ ListenerProtocolConfig :: Mixed { http_drain_timeout, has_tcp, has_http } => DrainStrategy :: Mixed {
247+ global_timeout : self . global_drain_timeout ,
248+ http_drain_timeout : http_drain_timeout. unwrap_or ( self . default_http_drain_timeout ) ,
249+ tcp_connections : has_tcp,
250+ http_connections : has_http,
251+ } ,
221252 } ;
222253
223254 let context = Arc :: new ( ListenerDrainContext :: new ( listener_id. clone ( ) , strategy. clone ( ) , active_connections) ) ;
@@ -326,6 +357,33 @@ impl DrainSignalingManager {
326357 Err ( Error :: new ( "Timeout waiting for listener drain completion" ) )
327358 }
328359 }
360+
361+ pub async fn initiate_listener_drain_from_filter_analysis (
362+ & self ,
363+ listener_id : String ,
364+ filter_chains : & [ FilterChain ] ,
365+ active_connections : usize ,
366+ ) -> Result < Arc < ListenerDrainContext > > {
367+ let mut has_http = false ;
368+ let mut has_tcp = false ;
369+ let mut http_drain_timeout: Option < Duration > = None ;
370+
371+ for filter_chain in filter_chains {
372+ match & filter_chain. terminal_filter {
373+ MainFilter :: Http ( http_config) => {
374+ has_http = true ;
375+ http_drain_timeout = http_config. drain_timeout ;
376+ } ,
377+ MainFilter :: Tcp ( _) => {
378+ has_tcp = true ;
379+ } ,
380+ }
381+ }
382+
383+ let protocol_config = ListenerProtocolConfig :: from_listener_analysis ( has_http, has_tcp, http_drain_timeout) ;
384+
385+ self . initiate_listener_drain ( listener_id, protocol_config, active_connections) . await
386+ }
329387}
330388
331389impl Clone for DrainSignalingManager {
@@ -403,7 +461,8 @@ mod tests {
403461 let manager = DrainSignalingManager :: new ( ) ;
404462 assert ! ( !manager. has_draining_listeners( ) . await ) ;
405463
406- let context = manager. initiate_listener_drain ( "test" . to_string ( ) , false , None , 1 ) . await . unwrap ( ) ;
464+ let context =
465+ manager. initiate_listener_drain ( "test" . to_string ( ) , ListenerProtocolConfig :: Tcp , 1 ) . await . unwrap ( ) ;
407466
408467 assert ! ( manager. has_draining_listeners( ) . await ) ;
409468 assert_eq ! ( manager. get_draining_listeners( ) . await , vec![ "test" ] ) ;
@@ -417,7 +476,14 @@ mod tests {
417476 async fn test_timeout_behavior ( ) {
418477 let manager = DrainSignalingManager :: with_timeouts ( Duration :: from_millis ( 50 ) , Duration :: from_millis ( 25 ) ) ;
419478
420- let context = manager. initiate_listener_drain ( "timeout-test" . to_string ( ) , true , None , 5 ) . await . unwrap ( ) ;
479+ let context = manager
480+ . initiate_listener_drain (
481+ "timeout-test" . to_string ( ) ,
482+ ListenerProtocolConfig :: Http { drain_timeout : None } ,
483+ 5 ,
484+ )
485+ . await
486+ . unwrap ( ) ;
421487
422488 sleep ( Duration :: from_millis ( 10 ) ) . await ;
423489 sleep ( Duration :: from_millis ( 60 ) ) . await ;
@@ -435,4 +501,47 @@ mod tests {
435501 "Expected manager to no longer track the listener after timeout"
436502 ) ;
437503 }
504+
505+ #[ tokio:: test]
506+ async fn test_mixed_protocol_drain_context ( ) {
507+ let strategy = DrainStrategy :: Mixed {
508+ global_timeout : Duration :: from_secs ( 600 ) ,
509+ http_drain_timeout : Duration :: from_secs ( 5 ) ,
510+ tcp_connections : true ,
511+ http_connections : true ,
512+ } ;
513+ let context = ListenerDrainContext :: new ( "test-mixed" . to_string ( ) , strategy, 10 ) ;
514+
515+ assert_eq ! ( context. get_active_connections( ) . await , 10 ) ;
516+ assert ! ( !context. is_completed( ) . await ) ;
517+ assert_eq ! ( context. get_http_drain_timeout( ) , Some ( Duration :: from_secs( 5 ) ) ) ;
518+ assert ! ( !context. is_timeout_exceeded( ) ) ;
519+ }
520+
521+ #[ tokio:: test]
522+ async fn test_listener_protocol_config_analysis ( ) {
523+ let http_config = ListenerProtocolConfig :: from_listener_analysis ( true , false , Some ( Duration :: from_secs ( 10 ) ) ) ;
524+ match http_config {
525+ ListenerProtocolConfig :: Http { drain_timeout } => {
526+ assert_eq ! ( drain_timeout, Some ( Duration :: from_secs( 10 ) ) ) ;
527+ } ,
528+ _ => panic ! ( "Expected HTTP config" ) ,
529+ }
530+
531+ let tcp_config = ListenerProtocolConfig :: from_listener_analysis ( false , true , None ) ;
532+ match tcp_config {
533+ ListenerProtocolConfig :: Tcp => { } ,
534+ _ => panic ! ( "Expected TCP config" ) ,
535+ }
536+
537+ let mixed_config = ListenerProtocolConfig :: from_listener_analysis ( true , true , Some ( Duration :: from_secs ( 3 ) ) ) ;
538+ match mixed_config {
539+ ListenerProtocolConfig :: Mixed { http_drain_timeout, has_tcp, has_http } => {
540+ assert_eq ! ( http_drain_timeout, Some ( Duration :: from_secs( 3 ) ) ) ;
541+ assert ! ( has_tcp) ;
542+ assert ! ( has_http) ;
543+ } ,
544+ _ => panic ! ( "Expected Mixed config" ) ,
545+ }
546+ }
438547}
0 commit comments