Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions orion-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ use orion_configuration::config::{
Bootstrap, Cluster, Listener as ListenerConfig,
};
pub use secrets::SecretManager;
pub use transport::internal_cluster_connector::cluster_helpers;
pub(crate) use transport::AsyncStream;
pub use transport::{
global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory, InternalConnectionPair,
InternalConnectionStats, InternalListenerHandle,
};

pub type Error = orion_error::Error;
pub type Result<T> = ::core::result::Result<T, Error>;
Expand Down
6 changes: 6 additions & 0 deletions orion-lib/src/listeners/filter_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub enum DownstreamConnectionMetadata {
proxy_peer_address: SocketAddr,
proxy_local_address: SocketAddr,
},
FromInternal {
listener_name: String,
endpoint_id: Option<String>,
},
}

impl DownstreamConnectionMetadata {
Expand All @@ -47,13 +51,15 @@ impl DownstreamConnectionMetadata {
Self::FromSocket { peer_address, .. } => *peer_address,
Self::FromProxyProtocol { original_peer_address, .. } => *original_peer_address,
Self::FromTlv { proxy_peer_address, .. } => *proxy_peer_address,
Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
}
}
pub fn local_address(&self) -> SocketAddr {
match self {
Self::FromSocket { local_address, .. } => *local_address,
Self::FromProxyProtocol { original_destination_address, .. } => *original_destination_address,
Self::FromTlv { original_destination_address, .. } => *original_destination_address,
Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use :Ipv4Addr::LOCALHOST here

In order to make it easy to understand, i think you should first write a proposal following https://github.com/kmesh-net/kmesh/blob/main/docs/proposal/template.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced the LOCALHOST:0 approach with a special reserved address range

}
}
}
Expand Down
105 changes: 98 additions & 7 deletions orion-lib/src/listeners/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum ListenerAddress {

#[derive(Debug, Clone)]
struct InternalListenerConfig {
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer_size_kb field is marked as dead_code but appears to be part of the public API for InternalListenerConfig. Consider implementing its usage or documenting why it's currently unused.

Suggested change
struct InternalListenerConfig {
struct InternalListenerConfig {
/// Optional buffer size in kilobytes for internal listeners.
/// This field is currently unused, but is retained for compatibility with configuration schemas
/// and for potential future use. See issue #TODO for details.

Copilot uses AI. Check for mistakes.

#[allow(dead_code)]
buffer_size_kb: Option<u32>,
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer_size_kb field is marked as dead code but appears to be part of the configuration structure. Consider either implementing its usage or removing it if it's not needed for the current implementation phase.

Copilot uses AI. Check for mistakes.

}
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -339,31 +340,121 @@ impl Listener {
mut route_updates_receiver: broadcast::Receiver<RouteConfigurationChange>,
mut secret_updates_receiver: broadcast::Receiver<TlsContextChange>,
) -> Error {
use crate::transport::global_internal_connection_factory;
use tracing::{debug, error, info, warn};

let filter_chains = Arc::new(filter_chains);
let factory = global_internal_connection_factory();

let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_string()).await
{
Ok(result) => result,
Err(e) => {
error!("Failed to register internal listener '{}': {}", name, e);
return e;
},
};

info!("Internal listener '{}' registered with connection factory", name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
info!("Internal listener '{}' registered with connection factory", name);
info!("Internal listener '{}' registered to connection factory", name);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated that


// For now, internal listeners just wait for updates
// The actual connection handling will be implemented when we add the internal connection factory
loop {
tokio::select! {
maybe_connection = connection_receiver.recv() => {
match maybe_connection {
Some(connection_pair) => {
debug!("Internal listener '{}' received new connection", name);

let filter_chains_clone = filter_chains.clone();
let listener_name = name.to_string();

tokio::spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is costy if the short connections number is very large, could we use a pool here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated that

if let Err(e) = Self::handle_internal_connection(
listener_name,
connection_pair,
filter_chains_clone,
).await {
warn!("Error handling internal connection: {}", e);
}
});
}
None => {
warn!("Internal listener '{}' connection channel closed", name);
break;
}
}
},
maybe_route_update = route_updates_receiver.recv() => {
match maybe_route_update {
Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update);}
Err(e) => {return e.into();}
Ok(route_update) => {
Self::process_route_update(&name, &filter_chains, route_update);
}
Err(e) => {
error!("Route update error for internal listener '{}': {}", name, e);
return e.into();
}
}
},
maybe_secret_update = secret_updates_receiver.recv() => {
match maybe_secret_update {
Ok(secret_update) => {
let mut filter_chains_clone = filter_chains.as_ref().clone();
Self::process_secret_update(&name, &mut filter_chains_clone, secret_update);
// Note: For internal listeners, we'd need to update the shared state
// This will be implemented when we add the internal connection factory
// TODO: Update the shared filter chains state for active connections
}
Err(e) => {
error!("Secret update error for internal listener '{}': {}", name, e);
return e.into();
}
Err(e) => {return e.into();}
}
}
}
}

if let Err(e) = factory.unregister_listener(name).await {
warn!("Failed to unregister internal listener '{}': {}", name, e);
}

info!("Internal listener '{}' shutting down", name);
Error::new("Internal listener shutdown")
}

async fn handle_internal_connection(
listener_name: String,
connection_pair: crate::transport::InternalConnectionPair,
filter_chains: Arc<HashMap<FilterChainMatch, FilterchainType>>,
) -> Result<()> {
use crate::listeners::filter_state::DownstreamConnectionMetadata;
use tracing::{debug, info, warn};

debug!("Handling new internal connection for listener '{}'", listener_name);

let downstream_metadata = DownstreamConnectionMetadata::FromInternal {
listener_name: listener_name.clone(),
endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not quite understand the fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields identify internal connections:

  • listener_name: Which internal listener accepted the connection (e.g., "waypoint_inbound")
  • endpoint_id: Optional ID for load balancing when multiple endpoints exist (e.g., "pod-123")

Since internal connections use in-memory channels (not real sockets), these fields replace traditional IP/port-based identification for routing and filter chain selection.

};

let filter_chain = match Self::select_filterchain(&filter_chains, &downstream_metadata, None)? {
Some(fc) => fc,
None => {
warn!("No matching filter chain found for internal connection");
return Err(crate::Error::new("No matching filter chain"));
},
};

let _downstream_stream = connection_pair.downstream;

match &filter_chain.handler {
crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => {
info!("Processing internal connection through HTTP filter chain");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
}
}

fn select_filterchain<'a, T>(
Expand Down
5 changes: 4 additions & 1 deletion orion-lib/src/listeners/listeners_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ use tokio::sync::{broadcast, mpsc};
use tracing::{info, warn};

use orion_configuration::config::{
listener::ListenerAddress, network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig,
network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig,
};

#[cfg(test)]
use orion_configuration::config::listener::ListenerAddress;

use super::listener::{Listener, ListenerFactory};
use crate::{secrets::TransportSecret, ConfigDump, Result};
#[derive(Debug, Clone)]
Expand Down
207 changes: 207 additions & 0 deletions orion-lib/src/transport/internal_cluster_connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2025 The kmesh Authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

use super::{global_internal_connection_factory, AsyncStream};
use crate::{Error, Result};

#[derive(Debug, Clone)]
pub struct InternalClusterConnector {
listener_name: String,
endpoint_id: Option<String>,
}

impl InternalClusterConnector {
pub fn new(listener_name: String, endpoint_id: Option<String>) -> Self {
Self { listener_name, endpoint_id }
}

pub fn listener_name(&self) -> &str {
&self.listener_name
}

pub fn endpoint_id(&self) -> Option<&str> {
self.endpoint_id.as_deref()
}

pub async fn connect(&self) -> Result<AsyncStream> {
let factory = global_internal_connection_factory();

if !factory.is_listener_active(&self.listener_name).await {
return Err(Error::new(format!(
"Internal listener '{}' is not active or not registered",
self.listener_name
)));
}

factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await
}

pub async fn is_available(&self) -> bool {
let factory = global_internal_connection_factory();
factory.is_listener_active(&self.listener_name).await
}
}

#[derive(Debug, Clone)]
pub struct InternalChannelConnector {
connector: InternalClusterConnector,
cluster_name: &'static str,
}

impl InternalChannelConnector {
pub fn new(listener_name: String, cluster_name: &'static str, endpoint_id: Option<String>) -> Self {
let connector = InternalClusterConnector::new(listener_name, endpoint_id);

Self { connector, cluster_name }
}

pub fn cluster_name(&self) -> &'static str {
self.cluster_name
}

pub fn listener_name(&self) -> &str {
self.connector.listener_name()
}

pub async fn connect(&self) -> Result<InternalChannel> {
let stream = self.connector.connect().await?;

Ok(InternalChannel {
stream,
cluster_name: self.cluster_name,
listener_name: self.connector.listener_name().to_string(),
endpoint_id: self.connector.endpoint_id().map(|s| s.to_string()),
})
}

pub async fn is_available(&self) -> bool {
self.connector.is_available().await
}
}

pub struct InternalChannel {
pub stream: AsyncStream,
pub cluster_name: &'static str,
pub listener_name: String,
pub endpoint_id: Option<String>,
}

impl InternalChannel {
pub fn cluster_name(&self) -> &'static str {
self.cluster_name
}

pub fn listener_name(&self) -> &str {
&self.listener_name
}

pub fn endpoint_id(&self) -> Option<&str> {
self.endpoint_id.as_deref()
}
}

pub mod cluster_helpers {
use super::*;
use orion_configuration::config::cluster::InternalEndpointAddress;

pub fn create_internal_connector(
internal_addr: &InternalEndpointAddress,
cluster_name: &'static str,
) -> InternalChannelConnector {
InternalChannelConnector::new(
internal_addr.server_listener_name.to_string(),
cluster_name,
internal_addr.endpoint_id.as_ref().map(|s| s.to_string()),
)
}

pub async fn is_internal_listener_available(listener_name: &str) -> bool {
let factory = global_internal_connection_factory();
factory.is_listener_active(listener_name).await
}

pub async fn get_internal_connection_stats() -> crate::transport::InternalConnectionStats {
let factory = global_internal_connection_factory();
factory.get_stats().await
}

pub async fn list_internal_listeners() -> Vec<String> {
let factory = global_internal_connection_factory();
factory.list_listeners().await
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_internal_connector_creation() {
let connector = InternalClusterConnector::new(String::from("test_listener"), Some(String::from("endpoint1")));
assert_eq!(connector.listener_name(), "test_listener");
assert_eq!(connector.endpoint_id(), Some("endpoint1"));
}

#[tokio::test]
async fn test_connection_to_non_existent_listener() {
let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None);
let result = connector.connect().await;
assert!(result.is_err());
}

#[tokio::test]
async fn test_availability_check() {
let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None);
assert!(!connector.is_available().await);
}

#[tokio::test]
async fn test_internal_channel_connector() {
let channel_connector = InternalChannelConnector::new(
String::from("test_listener"),
"test_cluster",
Some(String::from("endpoint1")),
);

assert_eq!(channel_connector.cluster_name(), "test_cluster");
assert_eq!(channel_connector.listener_name(), "test_listener");
assert!(!channel_connector.is_available().await);
}

#[tokio::test]
async fn test_cluster_helpers() {
use cluster_helpers::*;
use orion_configuration::config::cluster::InternalEndpointAddress;

let internal_addr = InternalEndpointAddress {
server_listener_name: String::from("test_listener").into(),
endpoint_id: Some(String::from("endpoint1").into()),
};

let connector = create_internal_connector(&internal_addr, "test_cluster");
assert_eq!(connector.cluster_name(), "test_cluster");
assert_eq!(connector.listener_name(), "test_listener");

assert!(!is_internal_listener_available("non_existent").await);

let stats = get_internal_connection_stats().await;
assert_eq!(stats.active_listeners, 0);

let listeners = list_internal_listeners().await;
assert!(listeners.is_empty());
}
}
Loading
Loading