Skip to content

Commit 926eced

Browse files
committed
feat: add HTTP Connection Manager drain_timeout support
- Add drain_timeout field with 5000ms default (Envoy compliant) - Integrate with drain signaling and timeout enforcement - Support protocol-specific draining (HTTP/1.1, HTTP/2, TCP) - Add comprehensive test coverage for drain behaviors Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
1 parent 9198984 commit 926eced

File tree

11 files changed

+2752
-103
lines changed

11 files changed

+2752
-103
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orion-configuration/src/config/listener.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,20 @@ use std::{
3838

3939
// Removed unused import
4040

41+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
42+
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
43+
pub enum DrainType {
44+
#[default]
45+
Default,
46+
ModifyOnly,
47+
}
48+
4149
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
4250
pub struct Listener {
4351
pub name: CompactString,
4452
pub address: ListenerAddress,
53+
#[serde(skip_serializing_if = "Option::is_none", default)]
54+
pub version_info: Option<String>,
4555
#[serde(with = "serde_filterchains")]
4656
pub filter_chains: HashMap<FilterChainMatch, FilterChain>,
4757
#[serde(skip_serializing_if = "Option::is_none", default = "Default::default")]
@@ -54,6 +64,8 @@ pub struct Listener {
5464
pub with_tlv_listener_filter: bool,
5565
#[serde(skip_serializing_if = "Option::is_none", default = "Default::default")]
5666
pub tlv_listener_filter_config: Option<super::listener_filters::TlvListenerFilterConfig>,
67+
#[serde(default)]
68+
pub drain_type: DrainType,
5769
}
5870

5971
impl Listener {
@@ -333,7 +345,7 @@ mod envoy_conversions {
333345
use std::hash::{DefaultHasher, Hash, Hasher};
334346
use std::str::FromStr;
335347

336-
use super::{FilterChain, FilterChainMatch, Listener, MainFilter, ServerNameMatch, TlsConfig};
348+
use super::{DrainType, FilterChain, FilterChainMatch, Listener, MainFilter, ServerNameMatch, TlsConfig};
337349
use crate::config::{
338350
common::*,
339351
core::{Address, CidrRange},
@@ -414,7 +426,7 @@ mod envoy_conversions {
414426
per_connection_buffer_limit_bytes,
415427
metadata,
416428
deprecated_v1,
417-
drain_type,
429+
// drain_type,
418430
// listener_filters,
419431
listener_filters_timeout,
420432
continue_on_listener_filters_timeout,
@@ -500,6 +512,7 @@ mod envoy_conversions {
500512
.with_node("socket_options");
501513
}
502514
let bind_device = bind_device.into_iter().next();
515+
let drain_type = DrainType::try_from(drain_type).unwrap_or_default();
503516
Ok(Self {
504517
name,
505518
address,
@@ -509,12 +522,26 @@ mod envoy_conversions {
509522
proxy_protocol_config,
510523
with_tlv_listener_filter,
511524
tlv_listener_filter_config,
525+
drain_type,
526+
version_info: None,
512527
})
513528
}())
514529
.with_name(name)
515530
}
516531
}
517532

533+
impl TryFrom<i32> for DrainType {
534+
type Error = GenericError;
535+
536+
fn try_from(value: i32) -> Result<Self, Self::Error> {
537+
match value {
538+
0 => Ok(DrainType::Default),
539+
1 => Ok(DrainType::ModifyOnly),
540+
_ => Err(GenericError::from_msg(format!("Unknown drain type: {}", value))),
541+
}
542+
}
543+
}
544+
518545
struct FilterChainWrapper((FilterChainMatch, FilterChain));
519546

520547
impl TryFrom<EnvoyFilterChain> for FilterChainWrapper {

orion-configuration/src/config/network_filters/http_connection_manager.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub struct HttpConnectionManager {
5151
#[serde(with = "humantime_serde")]
5252
#[serde(skip_serializing_if = "Option::is_none", default)]
5353
pub request_timeout: Option<Duration>,
54+
#[serde(with = "humantime_serde")]
55+
#[serde(skip_serializing_if = "Option::is_none", default)]
56+
pub drain_timeout: Option<Duration>,
5457
#[serde(skip_serializing_if = "Vec::is_empty", default)]
5558
pub http_filters: Vec<HttpFilter>,
5659
#[serde(skip_serializing_if = "Vec::is_empty", default)]
@@ -564,6 +567,61 @@ mod tests {
564567
assert!(MatchHostScoreLPM::Wildcard < MatchHostScoreLPM::Suffix("foo.bar.test.com".len()));
565568
assert!(MatchHostScoreLPM::Wildcard == MatchHostScoreLPM::Wildcard);
566569
}
570+
571+
#[test]
572+
fn test_drain_timeout_configuration() {
573+
let config = HttpConnectionManager {
574+
codec_type: CodecType::Auto,
575+
route_specifier: RouteSpecifier::RouteConfig(RouteConfiguration {
576+
name: "test_route".into(),
577+
most_specific_header_mutations_wins: false,
578+
response_header_modifier: Default::default(),
579+
request_headers_to_add: vec![],
580+
request_headers_to_remove: vec![],
581+
virtual_hosts: vec![],
582+
}),
583+
http_filters: vec![],
584+
enabled_upgrades: vec![],
585+
access_log: vec![],
586+
xff_settings: Default::default(),
587+
generate_request_id: false,
588+
preserve_external_request_id: false,
589+
always_set_request_id_in_response: false,
590+
tracing: None,
591+
request_timeout: Some(Duration::from_secs(30)),
592+
drain_timeout: Some(Duration::from_secs(10)),
593+
};
594+
595+
assert_eq!(config.drain_timeout, Some(Duration::from_secs(10)));
596+
assert_eq!(config.request_timeout, Some(Duration::from_secs(30)));
597+
}
598+
599+
#[test]
600+
fn test_drain_timeout_default() {
601+
let config = HttpConnectionManager {
602+
codec_type: CodecType::Http1,
603+
route_specifier: RouteSpecifier::RouteConfig(RouteConfiguration {
604+
name: "test_route_default".into(),
605+
most_specific_header_mutations_wins: false,
606+
response_header_modifier: Default::default(),
607+
request_headers_to_add: vec![],
608+
request_headers_to_remove: vec![],
609+
virtual_hosts: vec![],
610+
}),
611+
http_filters: vec![],
612+
enabled_upgrades: vec![],
613+
access_log: vec![],
614+
xff_settings: Default::default(),
615+
generate_request_id: false,
616+
preserve_external_request_id: false,
617+
always_set_request_id_in_response: false,
618+
tracing: None,
619+
request_timeout: None,
620+
drain_timeout: None,
621+
};
622+
623+
assert_eq!(config.drain_timeout, None);
624+
}
567625
}
568626

569627
#[cfg(feature = "envoy-conversions")]
@@ -702,7 +760,7 @@ mod envoy_conversions {
702760
stream_idle_timeout,
703761
// request_timeout,
704762
request_headers_timeout,
705-
drain_timeout,
763+
// drain_timeout,
706764
delayed_close_timeout,
707765
// access_log,
708766
access_log_flush_interval,
@@ -753,6 +811,11 @@ mod envoy_conversions {
753811
.transpose()
754812
.map_err(|_| GenericError::from_msg("failed to convert into Duration"))
755813
.with_node("request_timeout")?;
814+
let drain_timeout = drain_timeout
815+
.map(duration_from_envoy)
816+
.transpose()
817+
.map_err(|_| GenericError::from_msg("failed to convert into Duration"))
818+
.with_node("drain_timeout")?;
756819
let enabled_upgrades = upgrade_configs
757820
.iter()
758821
.filter(|upgrade_config| upgrade_config.enabled.map(|enabled| enabled.value).unwrap_or(true))
@@ -819,6 +882,7 @@ mod envoy_conversions {
819882
enabled_upgrades,
820883
route_specifier,
821884
request_timeout,
885+
drain_timeout,
822886
access_log,
823887
xff_settings,
824888
generate_request_id: generate_request_id.map(|v| v.value).unwrap_or(true),

orion-lib/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ ahash = "0.8.11"
1111
arc-swap = "1.7.1"
1212
arrayvec = "0.7.6"
1313
async-stream = "0.3"
14+
async-trait = "0.1.77"
1415
atomic-time = "0.1.4"
1516
bytes.workspace = true
1617
compact_str.workspace = true
18+
dashmap = "6.0"
1719
enum_dispatch = "0.3.13"
1820
exponential-backoff.workspace = true
1921
futures.workspace = true

0 commit comments

Comments
 (0)