From 236067934e51ff42275609d80b8af465a19fd5a2 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Thu, 18 Dec 2025 11:09:18 +0100 Subject: [PATCH 1/5] feat(mgmt): add ConfigClient The ConfigClient is the entity used to interact with the config processor. This abstraction will allow us to simplify configuration entry points (grpc/k8s or any other), hiding the internals of the communication that are now unnecessarily exposed. This is the first commit in a series to simplify the mgmt crate. Signed-off-by: Fredi Raspall --- mgmt/src/processor/mgmt_client.rs | 140 ++++++++++++++++++++++++++++++ mgmt/src/processor/mod.rs | 1 + 2 files changed, 141 insertions(+) create mode 100644 mgmt/src/processor/mgmt_client.rs diff --git a/mgmt/src/processor/mgmt_client.rs b/mgmt/src/processor/mgmt_client.rs new file mode 100644 index 000000000..49038c7ef --- /dev/null +++ b/mgmt/src/processor/mgmt_client.rs @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Interface to management processor + +#![allow(unused)] // TEMPORARY + +use config::ConfigError; +use config::ConfigResult; +use config::GenId; +use config::GwConfig; +use config::internal::status::DataplaneStatus; + +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; +use tokio::sync::oneshot::Receiver; +#[allow(unused)] +use tracing::{debug, error, info}; + +use thiserror::Error; + +/// A request type to the `ConfigProcessor` +#[derive(Debug)] +pub enum ConfigRequest { + ApplyConfig(Box), + GetCurrentConfig, + GetGeneration, + GetDataplaneStatus, +} + +/// A response from the `ConfigProcessor` +#[derive(Debug)] +pub enum ConfigResponse { + ApplyConfig(ConfigResult), + GetCurrentConfig(Box>), + GetGeneration(Option), + GetDataplaneStatus(Box), +} +type ConfigResponseChannel = oneshot::Sender; + +/// A type that includes a request to the `ConfigProcessor` and a channel to +/// issue the response back +pub struct ConfigChannelRequest { + pub(crate) request: ConfigRequest, /* a request to the mgmt processor */ + pub(crate) reply_tx: ConfigResponseChannel, /* the one-shot channel to respond */ +} +impl ConfigChannelRequest { + #[must_use] + pub fn new(request: ConfigRequest) -> (Self, Receiver) { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = Self { request, reply_tx }; + (request, reply_rx) + } +} + +/// The type of errors that can happen when issuing requests to a [`ConfigProcessor`] +#[derive(Error, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum ConfigProcessorError { + #[error("Failure sending request to config processor: {0}")] + SendRequestError(#[from] tokio::sync::mpsc::error::SendError), + #[error("Failure receiving response from config processor: {0}")] + RecvResponseError(#[from] tokio::sync::oneshot::error::RecvError), + #[error("Failure applying config: {0}")] + ApplyConfigError(#[from] ConfigError), + #[error("No configuration is applied")] + NoConfigApplied, +} + +/// A cloneable object that allows sending requests to a [`ConfigProcessor`]. +#[derive(Clone)] +pub struct ConfigClient { + tx: Sender, +} + +impl ConfigClient { + #[must_use] + pub fn new(channel_tx: Sender) -> Self { + Self { tx: channel_tx } + } + + /// Apply the provided `GwConfig` + /// + /// # Errors + /// This method returns `ConfigProcessorError` if the config request could not be sent, the response + /// could not be received or the response was a failure. + pub async fn apply_config(&self, gwconfig: GwConfig) -> Result<(), ConfigProcessorError> { + let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(Box::new(gwconfig))); + self.tx.send(req).await?; + match rx.await? { + ConfigResponse::ApplyConfig(Err(e)) => Err(e.into()), + ConfigResponse::ApplyConfig(Ok(())) => Ok(()), + _ => unreachable!(), + } + } + + /// Get the config currently applied. + /// + /// # Errors + /// This method returns `ConfigProcessorError` if the request could not be sent or the response + /// could not be received. + pub async fn get_current_config(&self) -> Result { + let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetCurrentConfig); + self.tx.send(req).await?; + let gwconfig = match rx.await? { + ConfigResponse::GetCurrentConfig(opt_config) => opt_config, + _ => unreachable!(), + }; + gwconfig.ok_or(ConfigProcessorError::NoConfigApplied) + } + + /// Apply the generation id of the configuration currently applied. + /// + /// # Errors + /// This method returns `ConfigProcessorError` if the config request could not be sent or the response + /// could not be received. + pub async fn get_generation(&self) -> Result { + let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetGeneration); + self.tx.send(req).await?; + let genid = match rx.await? { + ConfigResponse::GetGeneration(genid) => genid, + _ => unreachable!(), + }; + genid.ok_or(ConfigProcessorError::NoConfigApplied) + } + + /// Retrieve the current status of dataplane. + /// + /// # Errors + /// This method returns `ConfigProcessorError` if the config request could not be sent or the response + /// could not be received. + pub async fn get_status(&self) -> Result { + let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus); + self.tx.send(req).await?; + match rx.await? { + ConfigResponse::GetDataplaneStatus(status) => Ok(*status), + _ => unreachable!(), + } + } +} diff --git a/mgmt/src/processor/mod.rs b/mgmt/src/processor/mod.rs index 18d394bfd..05fff069f 100644 --- a/mgmt/src/processor/mod.rs +++ b/mgmt/src/processor/mod.rs @@ -9,4 +9,5 @@ mod display; pub(crate) mod gwconfigdb; pub(crate) mod k8s_client; pub(crate) mod launch; +pub(crate) mod mgmt_client; pub(crate) mod proc; From 1f4399c99f49e98cfe1b7b9d71f396ea5ec9b307 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Thu, 18 Dec 2025 12:25:28 +0100 Subject: [PATCH 2/5] feat(mgmt): switch to defs in mgmt_client Signed-off-by: Fredi Raspall --- mgmt/src/grpc/server.rs | 2 +- mgmt/src/processor/k8s_client.rs | 2 +- mgmt/src/processor/launch.rs | 2 +- mgmt/src/processor/proc.rs | 37 +------------------------------- 4 files changed, 4 insertions(+), 39 deletions(-) diff --git a/mgmt/src/grpc/server.rs b/mgmt/src/grpc/server.rs index a18a1ea28..e0b13520b 100644 --- a/mgmt/src/grpc/server.rs +++ b/mgmt/src/grpc/server.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use tonic::{Request, Response, Status}; use tracing::{debug, error}; -use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; +use crate::processor::mgmt_client::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; use config::converters::grpc::{ convert_dataplane_status_to_grpc, convert_gateway_config_from_grpc_with_defaults, }; diff --git a/mgmt/src/processor/k8s_client.rs b/mgmt/src/processor/k8s_client.rs index 4631043a7..540df617b 100644 --- a/mgmt/src/processor/k8s_client.rs +++ b/mgmt/src/processor/k8s_client.rs @@ -17,7 +17,7 @@ use k8s_intf::gateway_agent_crd::{ }; use tracing::{debug, error}; -use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; +use crate::processor::mgmt_client::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; #[derive(Debug, thiserror::Error)] pub enum K8sClientError { diff --git a/mgmt/src/processor/launch.rs b/mgmt/src/processor/launch.rs index 2275776f9..566f0174b 100644 --- a/mgmt/src/processor/launch.rs +++ b/mgmt/src/processor/launch.rs @@ -2,7 +2,7 @@ // Copyright Open Network Fabric Authors use crate::processor::k8s_client::{K8sClient, K8sClientError}; -use crate::processor::proc::ConfigChannelRequest; +use crate::processor::mgmt_client::ConfigChannelRequest; use crate::processor::proc::ConfigProcessor; use std::fmt::Display; diff --git a/mgmt/src/processor/proc.rs b/mgmt/src/processor/proc.rs index ff106605f..a65228eee 100644 --- a/mgmt/src/processor/proc.rs +++ b/mgmt/src/processor/proc.rs @@ -9,8 +9,6 @@ use std::collections::{HashMap, HashSet}; use tokio::spawn; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot; -use tokio::sync::oneshot::Receiver; use config::external::overlay::vpc::VpcTable; use config::internal::status::{ @@ -30,6 +28,7 @@ use pkt_meta::dst_vpcd_lookup::setup::build_dst_vni_lookup_configuration; use crate::processor::display::GwConfigDatabaseSummary; use crate::processor::gwconfigdb::GwConfigDatabase; +use crate::processor::mgmt_client::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; use crate::vpc_manager::{RequiredInformationBase, VpcManager}; use rekon::{Observe, Reconcile}; @@ -45,40 +44,6 @@ use stats::VpcStatsStore; use vpcmap::VpcDiscriminant; use vpcmap::map::{VpcMap, VpcMapWriter}; -/// A request type to the `ConfigProcessor` -#[derive(Debug)] -pub enum ConfigRequest { - ApplyConfig(Box), - GetCurrentConfig, - GetGeneration, - GetDataplaneStatus, -} - -/// A response from the `ConfigProcessor` -#[derive(Debug)] -pub enum ConfigResponse { - ApplyConfig(ConfigResult), - GetCurrentConfig(Box>), - GetGeneration(Option), - GetDataplaneStatus(Box), -} -type ConfigResponseChannel = oneshot::Sender; - -/// A type that includes a request to the `ConfigProcessor` and a channel to -/// issue the response back -pub struct ConfigChannelRequest { - request: ConfigRequest, /* a request to the mgmt processor */ - reply_tx: ConfigResponseChannel, /* the one-shot channel to respond */ -} -impl ConfigChannelRequest { - #[must_use] - pub fn new(request: ConfigRequest) -> (Self, Receiver) { - let (reply_tx, reply_rx) = oneshot::channel(); - let request = Self { request, reply_tx }; - (request, reply_rx) - } -} - /// Populate FRR status into the dataplane status structure pub async fn populate_status_with_frr( status: &mut DataplaneStatus, From 6576a41ec83c72c8011f9baca41fc91dc40e000c Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Thu, 18 Dec 2025 14:33:04 +0100 Subject: [PATCH 3/5] feat(mgmt): use ConfigClient Let gRPC server and k8s_client use the ConfigClient object (instead of a Sender) to send requests and receive responses from the configuration processor. This simplifies the code and the required types to imnport. Signed-off-by: Fredi Raspall --- mgmt/src/grpc/server.rs | 99 +++++-------------------- mgmt/src/processor/k8s_client.rs | 116 +++++++----------------------- mgmt/src/processor/launch.rs | 28 ++++---- mgmt/src/processor/mgmt_client.rs | 2 - mgmt/src/processor/proc.rs | 11 +-- mgmt/src/tests/mgmt.rs | 4 +- 6 files changed, 65 insertions(+), 195 deletions(-) diff --git a/mgmt/src/grpc/server.rs b/mgmt/src/grpc/server.rs index e0b13520b..baea9bb33 100644 --- a/mgmt/src/grpc/server.rs +++ b/mgmt/src/grpc/server.rs @@ -8,13 +8,12 @@ use std::sync::Arc; use tonic::{Request, Response, Status}; use tracing::{debug, error}; -use crate::processor::mgmt_client::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; +use crate::processor::mgmt_client::ConfigClient; use config::converters::grpc::{ convert_dataplane_status_to_grpc, convert_gateway_config_from_grpc_with_defaults, }; use config::internal::status::DataplaneStatus; use config::{GenId, GwConfig}; -use tokio::sync::mpsc::Sender; // Import proto-generated types use gateway_config::{ @@ -113,116 +112,56 @@ impl ConfigService for ConfigServiceImpl { /// Basic configuration manager implementation pub struct BasicConfigManager { - channel_tx: Sender, + client: ConfigClient, } impl BasicConfigManager { - pub fn new(channel_tx: Sender) -> Self { - Self { channel_tx } + pub fn new(client: ConfigClient) -> Self { + Self { client } } } #[async_trait] impl ConfigManager for BasicConfigManager { async fn get_current_config(&self) -> Result { - debug!("Received request to get current config"); - - // build a request to the config processor, send it and get the response - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetCurrentConfig); - self.channel_tx - .send(req) - .await - .map_err(|_| "Failure relaying request".to_string())?; - let response = rx + let config = self + .client + .get_current_config() .await - .map_err(|_| "Failure receiving from config processor".to_string())?; - match response { - ConfigResponse::GetCurrentConfig(opt_config) => { - if let Some(config) = *opt_config { - gateway_config::GatewayConfig::try_from(&config.external) - } else { - Err("No config is currently applied".to_string()) - } - } - _ => unreachable!(), - } + .map_err(|e| e.to_string())?; + gateway_config::GatewayConfig::try_from(&config.external) } async fn get_generation(&self) -> Result { - debug!("Received request to get current config generation"); - - // build a request to the config processor, send it and get the response - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetGeneration); - self.channel_tx - .send(req) - .await - .map_err(|_| "Failure relaying request".to_string())?; - let response = rx + self.client + .get_generation() .await - .map_err(|_| "Failure receiving from config processor".to_string())?; - match response { - ConfigResponse::GetGeneration(opt_genid) => { - opt_genid.ok_or_else(|| "No config is currently applied".to_string()) - } - _ => unreachable!(), - } + .map_err(|e| e.to_string()) } async fn apply_config(&self, grpc_config: GatewayConfig) -> Result<(), String> { - debug!("Received request to apply new config"); - // Convert config from gRPC to native external model let external_config = convert_gateway_config_from_grpc_with_defaults(&grpc_config) .map_err(|e| { - error!("Failed to process apply config: {e}"); + error!("Failed to parse config: {e}"); e })?; - // Create a new GwConfig with this ExternalConfig - let gw_config = Box::new(GwConfig::new(external_config)); - - // build a request to the config processor, send it and get the response - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config)); - self.channel_tx - .send(req) - .await - .map_err(|_| "Failure relaying request".to_string())?; - let response = rx + self.client + .apply_config(GwConfig::new(external_config)) .await - .map_err(|_| "Failure receiving from config processor".to_string())?; - match response { - ConfigResponse::ApplyConfig(result) => { - result.map_err(|e| format!("Failed to apply config: {e}")) - } - _ => unreachable!(), - } + .map_err(|e| e.to_string()) } async fn get_dataplane_status(&self) -> Result { debug!("Received request to get dataplane status"); - - // build a request to the config processor, send it and get the response - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus); - self.channel_tx - .send(req) - .await - .map_err(|_| "Failure relaying request".to_string())?; - let response = rx - .await - .map_err(|_| "Failure receiving from config processor".to_string())?; - - match response { - ConfigResponse::GetDataplaneStatus(status) => Ok(*status), - _ => unreachable!(), - } + self.client.get_status().await.map_err(|e| e.to_string()) } } /// Function to create the gRPC service -pub fn create_config_service( - channel_tx: Sender, -) -> ConfigServiceServer { - let config_manager = Arc::new(BasicConfigManager::new(channel_tx)); +pub fn create_config_service(client: ConfigClient) -> ConfigServiceServer { + let config_manager = Arc::new(BasicConfigManager::new(client)); let service = ConfigServiceImpl::new(config_manager); ConfigServiceServer::new(service) } diff --git a/mgmt/src/processor/k8s_client.rs b/mgmt/src/processor/k8s_client.rs index 540df617b..f040b33d3 100644 --- a/mgmt/src/processor/k8s_client.rs +++ b/mgmt/src/processor/k8s_client.rs @@ -5,19 +5,18 @@ use std::time::SystemTime; use chrono::{TimeZone, Utc}; use config::converters::k8s::ToK8sConversionError; -use tokio::sync::mpsc::Sender; use config::converters::k8s::status::dataplane_status::DataplaneStatusForK8sConversion; -use config::{ExternalConfig, GwConfig, internal::status::DataplaneStatus}; +use config::{ExternalConfig, GwConfig}; use k8s_intf::client::{ ReplaceStatusError, WatchError, replace_gateway_status, watch_gateway_agent_crd, }; use k8s_intf::gateway_agent_crd::{ GatewayAgentStatus, GatewayAgentStatusState, GatewayAgentStatusStateDataplane, }; -use tracing::{debug, error}; +use tracing::{debug, error, info}; -use crate::processor::mgmt_client::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; +use crate::processor::mgmt_client::{ConfigClient, ConfigProcessorError}; #[derive(Debug, thiserror::Error)] pub enum K8sClientError { @@ -31,43 +30,6 @@ pub enum K8sClientError { ReplaceStatusError(#[from] ReplaceStatusError), } -async fn get_dataplane_status( - tx: &Sender, -) -> Result { - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus); - tx.send(req).await.map_err(|_| { - MgmtStatusError::FetchStatusError("Failure relaying status fetch request".to_string()) - })?; - let response = rx.await.map_err(|e| { - MgmtStatusError::FetchStatusError(format!( - "Failure receiving status from config processor: {e}" - )) - })?; - - match response { - ConfigResponse::GetDataplaneStatus(status) => Ok(*status), - _ => unreachable!(), - } -} - -async fn get_current_config( - tx: &Sender, -) -> Result { - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetCurrentConfig); - tx.send(req).await.map_err(|_| { - MgmtStatusError::FetchStatusError("Failure relaying get config request".to_string()) - })?; - let response = rx.await.map_err(|e| { - MgmtStatusError::FetchStatusError(format!("Failure receiving config from processor: {e}")) - })?; - match response { - ConfigResponse::GetCurrentConfig(opt_config) => { - opt_config.ok_or(MgmtStatusError::NoConfigApplied) - } - _ => unreachable!(), - } -} - fn to_datetime(opt_time: Option<&SystemTime>) -> chrono::DateTime { match opt_time { Some(time) => chrono::DateTime::::from(*time), @@ -75,9 +37,8 @@ fn to_datetime(opt_time: Option<&SystemTime>) -> chrono::DateTime { } } -async fn update_gateway_status(hostname: &str, tx: &Sender) -> () { - let status = get_dataplane_status(tx).await; - +async fn update_gateway_status(hostname: &str, client: &ConfigClient) -> () { + let status = client.get_status().await; let status = match status { Ok(status) => status, Err(err) => { @@ -89,7 +50,7 @@ async fn update_gateway_status(hostname: &str, tx: &Sender } }; - let (last_applied_gen, last_applied_time) = match get_current_config(tx).await { + let (last_applied_gen, last_applied_time) = match client.get_current_config().await { Ok(config) => (config.genid(), to_datetime(config.meta.apply_t.as_ref())), Err(e) => { error!("Failed to get current config, skipping status update: {e}"); @@ -119,14 +80,6 @@ async fn update_gateway_status(hostname: &str, tx: &Sender } } -#[derive(Debug, thiserror::Error)] -enum MgmtStatusError { - #[error("Failed to fetch dataplane status: {0}")] - FetchStatusError(String), - #[error("No config is currently applied")] - NoConfigApplied, -} - pub struct K8sClient { hostname: String, } @@ -168,10 +121,7 @@ impl K8sClient { Ok(()) } - pub async fn k8s_start_config_watch( - &self, - tx: Sender, - ) -> Result<(), K8sClientError> { + pub async fn k8s_start_config_watch(&self, client: ConfigClient) -> Result<(), K8sClientError> { // Clone this here so that the closure does not try to borrow self // and cause K8sClient to not be Send for 'static but only a specific // lifetime @@ -179,46 +129,32 @@ impl K8sClient { watch_gateway_agent_crd(&hostname.clone(), async move |ga| { let external_config = ExternalConfig::try_from(ga); match external_config { + Err(e) => error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}"), Ok(external_config) => { let genid = external_config.genid; - let current_genid = match get_current_config(&tx).await { - Ok(config) => config.genid(), - Err(e) => match e { - MgmtStatusError::NoConfigApplied => 0, - _ => { - error!("Failed to get current config generation: {e}"); - return; - } + let applied_genid = match client.get_generation().await { + Ok(genid) => genid, + Err(ConfigProcessorError::NoConfigApplied) => 0, + Err(e) => { + error!("Failed to get current config generation: {e}"); + return; } }; - if current_genid == genid { - debug!("Not applying config, configuration generation unchanged (old={current_genid}, new={genid})"); + if applied_genid == genid { + debug!("Not applying config, configuration generation unchanged (old={applied_genid}, new={genid})"); return; } - let gw_config = Box::new(GwConfig::new(external_config)); + let gwconfig = GwConfig::new(external_config); - let (req, rx) = - ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config)); - let tx_result = tx.send(req).await; - if let Err(e) = tx_result { - error!("Failure sending request to config processor: {e}"); - } - match rx.await { - Err(e) => error!("Failure receiving from config processor: {e}"), - Ok(response) => match response { - ConfigResponse::ApplyConfig(Err(e)) => { - error!("Failed to apply config: {e}"); - } - ConfigResponse::ApplyConfig(Ok(())) => { - update_gateway_status(&hostname, &tx).await; - } - _ => unreachable!(), + // request the config processor to apply the config and update status on success + match client.apply_config(gwconfig).await { + Ok(()) => { + info!("Config for generation {genid} was successfully applied. Updating status..."); + update_gateway_status(&hostname, &client).await; }, - }; - } - Err(e) => { - error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}"); + Err(e) => error!("Failed to apply the config for generation {genid}: {e}"), + } } } }) @@ -228,11 +164,11 @@ impl K8sClient { pub async fn k8s_start_status_update( &self, - tx: Sender, + client: ConfigClient, status_update_interval: &std::time::Duration, ) -> Result<(), K8sClientError> { loop { - update_gateway_status(&self.hostname, &tx).await; + update_gateway_status(&self.hostname, &client).await; tokio::time::sleep(*status_update_interval).await; } } diff --git a/mgmt/src/processor/launch.rs b/mgmt/src/processor/launch.rs index 566f0174b..cb0de2b69 100644 --- a/mgmt/src/processor/launch.rs +++ b/mgmt/src/processor/launch.rs @@ -2,7 +2,6 @@ // Copyright Open Network Fabric Authors use crate::processor::k8s_client::{K8sClient, K8sClientError}; -use crate::processor::mgmt_client::ConfigChannelRequest; use crate::processor::proc::ConfigProcessor; use std::fmt::Display; @@ -14,7 +13,6 @@ use std::task::{Context, Poll}; use tokio::io; use tokio::net::UnixListener; -use tokio::sync::mpsc::Sender; use tokio_stream::Stream; use tonic::transport::Server; @@ -25,6 +23,7 @@ use concurrency::sync::Arc; use tracing::{debug, error, info, warn}; use crate::grpc::server::create_config_service; +use crate::processor::mgmt_client::ConfigClient; use crate::processor::proc::ConfigProcessorParams; #[derive(Debug, thiserror::Error)] @@ -51,12 +50,9 @@ pub enum LaunchError { } /// Start the gRPC server on TCP -async fn start_grpc_server_tcp( - addr: SocketAddr, - channel_tx: Sender, -) -> Result<(), LaunchError> { +async fn start_grpc_server_tcp(addr: SocketAddr, client: ConfigClient) -> Result<(), LaunchError> { info!("Starting gRPC server on TCP address: {addr}"); - let config_service = create_config_service(channel_tx); + let config_service = create_config_service(client); Server::builder() .add_service(config_service) @@ -97,7 +93,7 @@ impl Stream for UnixAcceptor { /// Start the gRPC server on UNIX socket async fn start_grpc_server_unix( socket_path: &Path, - channel_tx: Sender, + client: ConfigClient, ) -> Result<(), LaunchError> { info!( "Starting gRPC server on UNIX socket: {}", @@ -145,7 +141,7 @@ async fn start_grpc_server_unix( let acceptor = UnixAcceptor { listener }; // Create the gRPC service - let config_service = create_config_service(channel_tx); + let config_service = create_config_service(client); // Start the server with UNIX domain socket Server::builder() @@ -217,13 +213,13 @@ pub fn start_mgmt( /* block thread to run gRPC and configuration processor */ rt.block_on(async { - let (processor, tx) = ConfigProcessor::new(params.processor_params); + let (processor, client) = ConfigProcessor::new(params.processor_params); tokio::spawn(async { processor.run().await }); // Start the appropriate server based on address type let result = match server_address { - ServerAddress::Tcp(sock_addr) => start_grpc_server_tcp(sock_addr, tx).await, - ServerAddress::Unix(path) => start_grpc_server_unix(&path, tx).await, + ServerAddress::Tcp(sock_addr) => start_grpc_server_tcp(sock_addr, client).await, + ServerAddress::Unix(path) => start_grpc_server_unix(&path, client).await, }; if let Err(e) = result { error!("Failed to start gRPC server: {e}"); @@ -237,8 +233,8 @@ pub fn start_mgmt( debug!("Will start watching k8s for configuration changes"); rt.block_on(async { let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str())); - let (processor, tx) = ConfigProcessor::new(params.processor_params); - let tx1 = tx.clone(); + let (processor, client) = ConfigProcessor::new(params.processor_params); + let client1 = client.clone(); let k8s_client1 = k8s_client.clone(); k8s_client.init().await.map_err(|e| { @@ -246,9 +242,9 @@ pub fn start_mgmt( LaunchError::K8sClientError(e) })?; let mut processor_handle = Some(tokio::spawn(async { processor.run().await })); - let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(tx).await })); + let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(client).await })); let mut k8s_status_handle = Some(tokio::spawn(async move { - k8s_client1.k8s_start_status_update(tx1, &STATUS_UPDATE_INTERVAL).await + k8s_client1.k8s_start_status_update(client1, &STATUS_UPDATE_INTERVAL).await })); loop { tokio::select! { diff --git a/mgmt/src/processor/mgmt_client.rs b/mgmt/src/processor/mgmt_client.rs index 49038c7ef..e8e230453 100644 --- a/mgmt/src/processor/mgmt_client.rs +++ b/mgmt/src/processor/mgmt_client.rs @@ -3,8 +3,6 @@ //! Interface to management processor -#![allow(unused)] // TEMPORARY - use config::ConfigError; use config::ConfigResult; use config::GenId; diff --git a/mgmt/src/processor/proc.rs b/mgmt/src/processor/proc.rs index a65228eee..91a93a9df 100644 --- a/mgmt/src/processor/proc.rs +++ b/mgmt/src/processor/proc.rs @@ -8,7 +8,6 @@ use std::collections::{HashMap, HashSet}; use tokio::spawn; use tokio::sync::mpsc; -use tokio::sync::mpsc::Sender; use config::external::overlay::vpc::VpcTable; use config::internal::status::{ @@ -28,7 +27,9 @@ use pkt_meta::dst_vpcd_lookup::setup::build_dst_vni_lookup_configuration; use crate::processor::display::GwConfigDatabaseSummary; use crate::processor::gwconfigdb::GwConfigDatabase; -use crate::processor::mgmt_client::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; +use crate::processor::mgmt_client::{ + ConfigChannelRequest, ConfigClient, ConfigRequest, ConfigResponse, +}; use crate::vpc_manager::{RequiredInformationBase, VpcManager}; use rekon::{Observe, Reconcile}; @@ -91,10 +92,10 @@ impl ConfigProcessor { const CHANNEL_SIZE: usize = 1; // This should not be changed ///////////////////////////////////////////////////////////////////////////////// - /// Create a [`ConfigProcessor`] + /// Create a [`ConfigProcessor`] and return a [`ConfigClient`] to interact with it ///////////////////////////////////////////////////////////////////////////////// #[must_use] - pub(crate) fn new(proc_params: ConfigProcessorParams) -> (Self, Sender) { + pub(crate) fn new(proc_params: ConfigProcessorParams) -> (Self, ConfigClient) { debug!("Creating config processor..."); let (tx, rx) = mpsc::channel(Self::CHANNEL_SIZE); @@ -112,7 +113,7 @@ impl ConfigProcessor { vpc_mgr, proc_params, }; - (processor, tx) + (processor, ConfigClient::new(tx)) } /// Main entry point for new configurations diff --git a/mgmt/src/tests/mgmt.rs b/mgmt/src/tests/mgmt.rs index 82ef93156..13674e2ea 100644 --- a/mgmt/src/tests/mgmt.rs +++ b/mgmt/src/tests/mgmt.rs @@ -444,8 +444,8 @@ pub mod test { vpc_stats_store, }; - /* start config processor to test the processing of a config. The processor embeds the config database - and has the frrmi. In this test, we don't use any channel to communicate the config. */ + /* start config processor to test the processing of a config. The processor embeds the + config database . In this test, we don't use any channel to communicate the config. */ let (mut processor, _) = ConfigProcessor::new(processor_config); /* let the processor process the config */ From 0ae14113d909ab60f70d3a101ce32d4eb97a0bd8 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Thu, 18 Dec 2025 22:11:32 +0100 Subject: [PATCH 4/5] feat(mgmt): let k8s_client have a ConfigClient Let the k8s_client own a ConfigClient so that it does not need to be propagated throughout. Signed-off-by: Fredi Raspall --- mgmt/src/processor/k8s_client.rs | 22 ++++++++++------------ mgmt/src/processor/launch.rs | 7 +++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/mgmt/src/processor/k8s_client.rs b/mgmt/src/processor/k8s_client.rs index f040b33d3..eba11290a 100644 --- a/mgmt/src/processor/k8s_client.rs +++ b/mgmt/src/processor/k8s_client.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors +use std::sync::Arc; use std::time::SystemTime; use chrono::{TimeZone, Utc}; @@ -82,12 +83,14 @@ async fn update_gateway_status(hostname: &str, client: &ConfigClient) -> () { pub struct K8sClient { hostname: String, + client: ConfigClient, } impl K8sClient { - pub fn new(hostname: &str) -> Self { + pub fn new(hostname: &str, client: ConfigClient) -> Self { Self { hostname: hostname.to_string(), + client, } } @@ -121,18 +124,14 @@ impl K8sClient { Ok(()) } - pub async fn k8s_start_config_watch(&self, client: ConfigClient) -> Result<(), K8sClientError> { - // Clone this here so that the closure does not try to borrow self - // and cause K8sClient to not be Send for 'static but only a specific - // lifetime - let hostname = self.hostname.clone(); - watch_gateway_agent_crd(&hostname.clone(), async move |ga| { + pub async fn k8s_start_config_watch(k8s_client: Arc) -> Result<(), K8sClientError> { + watch_gateway_agent_crd(&k8s_client.hostname.clone(), async move |ga| { let external_config = ExternalConfig::try_from(ga); match external_config { Err(e) => error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}"), Ok(external_config) => { let genid = external_config.genid; - let applied_genid = match client.get_generation().await { + let applied_genid = match k8s_client.client.get_generation().await { Ok(genid) => genid, Err(ConfigProcessorError::NoConfigApplied) => 0, Err(e) => { @@ -148,10 +147,10 @@ impl K8sClient { let gwconfig = GwConfig::new(external_config); // request the config processor to apply the config and update status on success - match client.apply_config(gwconfig).await { + match k8s_client.client.apply_config(gwconfig).await { Ok(()) => { info!("Config for generation {genid} was successfully applied. Updating status..."); - update_gateway_status(&hostname, &client).await; + update_gateway_status(&k8s_client.hostname, &k8s_client.client).await; }, Err(e) => error!("Failed to apply the config for generation {genid}: {e}"), } @@ -164,11 +163,10 @@ impl K8sClient { pub async fn k8s_start_status_update( &self, - client: ConfigClient, status_update_interval: &std::time::Duration, ) -> Result<(), K8sClientError> { loop { - update_gateway_status(&self.hostname, &client).await; + update_gateway_status(&self.hostname, &self.client).await; tokio::time::sleep(*status_update_interval).await; } } diff --git a/mgmt/src/processor/launch.rs b/mgmt/src/processor/launch.rs index cb0de2b69..e51b17fa8 100644 --- a/mgmt/src/processor/launch.rs +++ b/mgmt/src/processor/launch.rs @@ -232,9 +232,8 @@ pub fn start_mgmt( } else { debug!("Will start watching k8s for configuration changes"); rt.block_on(async { - let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str())); let (processor, client) = ConfigProcessor::new(params.processor_params); - let client1 = client.clone(); + let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str(), client)); let k8s_client1 = k8s_client.clone(); k8s_client.init().await.map_err(|e| { @@ -242,9 +241,9 @@ pub fn start_mgmt( LaunchError::K8sClientError(e) })?; let mut processor_handle = Some(tokio::spawn(async { processor.run().await })); - let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(client).await })); + let mut k8s_config_handle = Some(tokio::spawn(async move { K8sClient::k8s_start_config_watch(k8s_client).await })); let mut k8s_status_handle = Some(tokio::spawn(async move { - k8s_client1.k8s_start_status_update(client1, &STATUS_UPDATE_INTERVAL).await + k8s_client1.k8s_start_status_update(&STATUS_UPDATE_INTERVAL).await })); loop { tokio::select! { From 3be1c370197909ca5ff83d6c7c3760195478437d Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Thu, 18 Dec 2025 22:16:18 +0100 Subject: [PATCH 5/5] feat(mgmt): make update_gateway_status a method of k8s_client Signed-off-by: Fredi Raspall --- mgmt/src/processor/k8s_client.rs | 90 ++++++++++++++++---------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/mgmt/src/processor/k8s_client.rs b/mgmt/src/processor/k8s_client.rs index eba11290a..755833b28 100644 --- a/mgmt/src/processor/k8s_client.rs +++ b/mgmt/src/processor/k8s_client.rs @@ -38,49 +38,6 @@ fn to_datetime(opt_time: Option<&SystemTime>) -> chrono::DateTime { } } -async fn update_gateway_status(hostname: &str, client: &ConfigClient) -> () { - let status = client.get_status().await; - let status = match status { - Ok(status) => status, - Err(err) => { - error!( - "Failed to fetch dataplane status, skipping status update: {}", - err - ); - return; - } - }; - - let (last_applied_gen, last_applied_time) = match client.get_current_config().await { - Ok(config) => (config.genid(), to_datetime(config.meta.apply_t.as_ref())), - Err(e) => { - error!("Failed to get current config, skipping status update: {e}"); - return; - } - }; - - let k8s_status = match GatewayAgentStatus::try_from(&DataplaneStatusForK8sConversion { - last_applied_gen: Some(last_applied_gen), - last_applied_time: Some(&last_applied_time), - last_collected_time: Some(&chrono::Utc::now()), - last_heartbeat: Some(&chrono::Utc::now()), - status: Some(&status), - }) { - Ok(status) => status, - Err(err) => { - error!("Failed to convert status to GatewayAgentStatus: {err}"); - return; - } - }; - - match replace_gateway_status(hostname, &k8s_status).await { - Ok(()) => (), - Err(err) => { - error!("Failed to update gateway status: {err}"); - } - } -} - pub struct K8sClient { hostname: String, client: ConfigClient, @@ -124,6 +81,49 @@ impl K8sClient { Ok(()) } + async fn update_gateway_status(&self) -> () { + let status = self.client.get_status().await; + let status = match status { + Ok(status) => status, + Err(err) => { + error!( + "Failed to fetch dataplane status, skipping status update: {}", + err + ); + return; + } + }; + + let (last_applied_gen, last_applied_time) = match self.client.get_current_config().await { + Ok(config) => (config.genid(), to_datetime(config.meta.apply_t.as_ref())), + Err(e) => { + error!("Failed to get current config, skipping status update: {e}"); + return; + } + }; + + let k8s_status = match GatewayAgentStatus::try_from(&DataplaneStatusForK8sConversion { + last_applied_gen: Some(last_applied_gen), + last_applied_time: Some(&last_applied_time), + last_collected_time: Some(&chrono::Utc::now()), + last_heartbeat: Some(&chrono::Utc::now()), + status: Some(&status), + }) { + Ok(status) => status, + Err(err) => { + error!("Failed to convert status to GatewayAgentStatus: {err}"); + return; + } + }; + + match replace_gateway_status(&self.hostname, &k8s_status).await { + Ok(()) => (), + Err(err) => { + error!("Failed to update gateway status: {err}"); + } + } + } + pub async fn k8s_start_config_watch(k8s_client: Arc) -> Result<(), K8sClientError> { watch_gateway_agent_crd(&k8s_client.hostname.clone(), async move |ga| { let external_config = ExternalConfig::try_from(ga); @@ -150,7 +150,7 @@ impl K8sClient { match k8s_client.client.apply_config(gwconfig).await { Ok(()) => { info!("Config for generation {genid} was successfully applied. Updating status..."); - update_gateway_status(&k8s_client.hostname, &k8s_client.client).await; + k8s_client.update_gateway_status().await; }, Err(e) => error!("Failed to apply the config for generation {genid}: {e}"), } @@ -166,7 +166,7 @@ impl K8sClient { status_update_interval: &std::time::Duration, ) -> Result<(), K8sClientError> { loop { - update_gateway_status(&self.hostname, &self.client).await; + self.update_gateway_status().await; tokio::time::sleep(*status_update_interval).await; } }