-
Notifications
You must be signed in to change notification settings - Fork 18
feat: implement internal connection factory for in-process communication #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
7574a84
e942545
d32ae90
b6abbbf
33e8c58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -73,6 +73,7 @@ enum ListenerAddress { | |||||||||||
|
||||||||||||
#[derive(Debug, Clone)] | ||||||||||||
struct InternalListenerConfig { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The buffer_size_kb field is marked as dead_code but appears to be part of the public API for InternalListenerConfig. Consider implementing its usage or documenting why it's currently unused.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||
#[allow(dead_code)] | ||||||||||||
buffer_size_kb: Option<u32>, | ||||||||||||
|
||||||||||||
} | ||||||||||||
#[derive(Debug, Clone)] | ||||||||||||
|
@@ -339,31 +340,121 @@ impl Listener { | |||||||||||
mut route_updates_receiver: broadcast::Receiver<RouteConfigurationChange>, | ||||||||||||
mut secret_updates_receiver: broadcast::Receiver<TlsContextChange>, | ||||||||||||
) -> Error { | ||||||||||||
use crate::transport::global_internal_connection_factory; | ||||||||||||
use tracing::{debug, error, info, warn}; | ||||||||||||
|
||||||||||||
let filter_chains = Arc::new(filter_chains); | ||||||||||||
let factory = global_internal_connection_factory(); | ||||||||||||
|
||||||||||||
let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_string()).await | ||||||||||||
{ | ||||||||||||
Ok(result) => result, | ||||||||||||
Err(e) => { | ||||||||||||
error!("Failed to register internal listener '{}': {}", name, e); | ||||||||||||
return e; | ||||||||||||
}, | ||||||||||||
}; | ||||||||||||
|
||||||||||||
info!("Internal listener '{}' registered with connection factory", name); | ||||||||||||
|
info!("Internal listener '{}' registered with connection factory", name); | |
info!("Internal listener '{}' registered to connection factory", name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated that
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is costy if the short connections number is very large, could we use a pool here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated that
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not quite understand the fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fields identify internal connections:
listener_name
: Which internal listener accepted the connection (e.g., "waypoint_inbound")- endpoint_id: Optional ID for load balancing when multiple endpoints exist (e.g., "pod-123")
Since internal connections use in-memory channels (not real sockets), these fields replace traditional IP/port-based identification for routing and filter chain selection.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
// Copyright 2025 The kmesh Authors | ||
// | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
// | ||
|
||
use super::{global_internal_connection_factory, AsyncStream}; | ||
use crate::{Error, Result}; | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct InternalClusterConnector { | ||
listener_name: String, | ||
endpoint_id: Option<String>, | ||
} | ||
|
||
impl InternalClusterConnector { | ||
pub fn new(listener_name: String, endpoint_id: Option<String>) -> Self { | ||
Self { listener_name, endpoint_id } | ||
} | ||
|
||
pub fn listener_name(&self) -> &str { | ||
&self.listener_name | ||
} | ||
|
||
pub fn endpoint_id(&self) -> Option<&str> { | ||
self.endpoint_id.as_deref() | ||
} | ||
|
||
pub async fn connect(&self) -> Result<AsyncStream> { | ||
let factory = global_internal_connection_factory(); | ||
|
||
if !factory.is_listener_active(&self.listener_name).await { | ||
return Err(Error::new(format!( | ||
"Internal listener '{}' is not active or not registered", | ||
self.listener_name | ||
))); | ||
} | ||
|
||
factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await | ||
} | ||
|
||
pub async fn is_available(&self) -> bool { | ||
let factory = global_internal_connection_factory(); | ||
factory.is_listener_active(&self.listener_name).await | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct InternalChannelConnector { | ||
connector: InternalClusterConnector, | ||
cluster_name: &'static str, | ||
} | ||
|
||
impl InternalChannelConnector { | ||
pub fn new(listener_name: String, cluster_name: &'static str, endpoint_id: Option<String>) -> Self { | ||
let connector = InternalClusterConnector::new(listener_name, endpoint_id); | ||
|
||
Self { connector, cluster_name } | ||
} | ||
|
||
pub fn cluster_name(&self) -> &'static str { | ||
self.cluster_name | ||
} | ||
|
||
pub fn listener_name(&self) -> &str { | ||
self.connector.listener_name() | ||
} | ||
|
||
pub async fn connect(&self) -> Result<InternalChannel> { | ||
let stream = self.connector.connect().await?; | ||
|
||
Ok(InternalChannel { | ||
stream, | ||
cluster_name: self.cluster_name, | ||
listener_name: self.connector.listener_name().to_string(), | ||
endpoint_id: self.connector.endpoint_id().map(|s| s.to_string()), | ||
}) | ||
} | ||
|
||
pub async fn is_available(&self) -> bool { | ||
self.connector.is_available().await | ||
} | ||
} | ||
|
||
pub struct InternalChannel { | ||
pub stream: AsyncStream, | ||
pub cluster_name: &'static str, | ||
pub listener_name: String, | ||
pub endpoint_id: Option<String>, | ||
} | ||
|
||
impl InternalChannel { | ||
pub fn cluster_name(&self) -> &'static str { | ||
self.cluster_name | ||
} | ||
|
||
pub fn listener_name(&self) -> &str { | ||
&self.listener_name | ||
} | ||
|
||
pub fn endpoint_id(&self) -> Option<&str> { | ||
self.endpoint_id.as_deref() | ||
} | ||
} | ||
|
||
pub mod cluster_helpers { | ||
use super::*; | ||
use orion_configuration::config::cluster::InternalEndpointAddress; | ||
|
||
pub fn create_internal_connector( | ||
internal_addr: &InternalEndpointAddress, | ||
cluster_name: &'static str, | ||
) -> InternalChannelConnector { | ||
InternalChannelConnector::new( | ||
internal_addr.server_listener_name.to_string(), | ||
cluster_name, | ||
internal_addr.endpoint_id.as_ref().map(|s| s.to_string()), | ||
) | ||
} | ||
|
||
pub async fn is_internal_listener_available(listener_name: &str) -> bool { | ||
let factory = global_internal_connection_factory(); | ||
factory.is_listener_active(listener_name).await | ||
} | ||
|
||
pub async fn get_internal_connection_stats() -> crate::transport::InternalConnectionStats { | ||
let factory = global_internal_connection_factory(); | ||
factory.get_stats().await | ||
} | ||
|
||
pub async fn list_internal_listeners() -> Vec<String> { | ||
let factory = global_internal_connection_factory(); | ||
factory.list_listeners().await | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[tokio::test] | ||
async fn test_internal_connector_creation() { | ||
let connector = InternalClusterConnector::new(String::from("test_listener"), Some(String::from("endpoint1"))); | ||
assert_eq!(connector.listener_name(), "test_listener"); | ||
assert_eq!(connector.endpoint_id(), Some("endpoint1")); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_connection_to_non_existent_listener() { | ||
let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None); | ||
let result = connector.connect().await; | ||
assert!(result.is_err()); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_availability_check() { | ||
let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None); | ||
assert!(!connector.is_available().await); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_internal_channel_connector() { | ||
let channel_connector = InternalChannelConnector::new( | ||
String::from("test_listener"), | ||
"test_cluster", | ||
Some(String::from("endpoint1")), | ||
); | ||
|
||
assert_eq!(channel_connector.cluster_name(), "test_cluster"); | ||
assert_eq!(channel_connector.listener_name(), "test_listener"); | ||
assert!(!channel_connector.is_available().await); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_cluster_helpers() { | ||
use cluster_helpers::*; | ||
use orion_configuration::config::cluster::InternalEndpointAddress; | ||
|
||
let internal_addr = InternalEndpointAddress { | ||
server_listener_name: String::from("test_listener").into(), | ||
endpoint_id: Some(String::from("endpoint1").into()), | ||
}; | ||
|
||
let connector = create_internal_connector(&internal_addr, "test_cluster"); | ||
assert_eq!(connector.cluster_name(), "test_cluster"); | ||
assert_eq!(connector.listener_name(), "test_listener"); | ||
|
||
assert!(!is_internal_listener_available("non_existent").await); | ||
|
||
let stats = get_internal_connection_stats().await; | ||
assert_eq!(stats.active_listeners, 0); | ||
|
||
let listeners = list_internal_listeners().await; | ||
assert!(listeners.is_empty()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use :Ipv4Addr::LOCALHOST here
In order to make it easy to understand, i think you should first write a proposal following https://github.com/kmesh-net/kmesh/blob/main/docs/proposal/template.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced the
LOCALHOST:0
approach with a special reserved address range