Skip to content

Conversation

Eeshu-Yadav
Copy link
Contributor

@Eeshu-Yadav Eeshu-Yadav commented Sep 28, 2025

@kmesh-bot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign hzxuzhonghu for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link
Contributor

Summary of Changes

Hello @Eeshu-Yadav, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a foundational internal communication system within the orion-lib crate. It enables components to communicate directly within the same process without the overhead of network operations. This system provides a structured way to register internal listeners, establish connections, and manage their lifecycle, laying the groundwork for more efficient inter-component interactions.

Highlights

  • Internal Connection Factory: Introduced a new InternalConnectionFactory with a thread-safe listener registry to manage in-process communication.
  • Listener Management: Implemented InternalListenerHandle for robust connection management and lifecycle control of internal listeners.
  • Cluster and Channel Connectors: Added InternalClusterConnector for establishing cluster-side connections and InternalChannelConnector which provides cluster context for internal communication.
  • Global Singleton Access: A global singleton factory is now accessible across the codebase, simplifying the retrieval and use of the internal connection factory.
  • Connection Handling Integration: The Listener component has been updated to register with the internal connection factory and actively handle incoming internal connections, processing them through existing filter chains.
  • Comprehensive Testing: New unit and integration tests have been added to validate the functionality, lifecycle, and error handling of the internal connection mechanism.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a foundational implementation for an internal connection factory, enabling in-process communication between different parts of the application. The changes include new components for creating and managing internal connections, listeners, and connectors, along with integration into the existing listener management logic. While the overall structure is sound, I've identified a critical bug in the listener lifecycle management that would prevent the feature from working as intended. Additionally, there are some missing implementations for data transfer and areas for improvement in the tests. My review provides specific feedback and suggestions to address these points.


info!("Internal listener '{}' registered with connection factory", name);

let _listener_ref = std::sync::Arc::new(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line introduces a critical bug. It shadows the _listener_ref returned from factory.register_listener, causing the original Arc to be dropped. As a result, the Weak reference in the InternalListenerHandle will see a strong count of 0, and is_alive() will always return false. This effectively makes the listener unreachable for new connections immediately after registration.

Please remove this line. The _listener_ref from register_listener on line 349 must be kept in scope for the lifetime of the listener task to correctly manage its lifecycle.

Comment on lines 117 to 126
impl AsyncRead for InternalStream {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::task::Poll::Pending
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The AsyncRead and AsyncWrite implementations for InternalStream (and InternalStreamWrapper) are currently placeholders that always return Poll::Pending. This prevents any actual data transfer over the internal connections, making the feature fundamentally incomplete.

To make this functional, you should implement proper in-memory stream logic. A good approach would be to use tokio::io::duplex to create a pair of connected in-memory byte streams for each internal connection.

assert_eq!(stats.max_pooled_connections, 0);

let listeners = list_internal_listeners().await;
assert!(listeners.is_empty() || !listeners.is_empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This assertion assert!(listeners.is_empty() || !listeners.is_empty()); is a tautology and will always pass, which means this part of the test is not actually verifying anything.

A more meaningful test would involve registering a listener and then asserting that list_internal_listeners() contains its name. For example:

let factory = global_internal_connection_factory();
let listener_name = "my-test-listener";
let (_h, _r, _g) = factory.register_listener(listener_name.to_string()).await.unwrap();
let listeners = list_internal_listeners().await;
assert!(listeners.contains(&listener_name.to_string()));
factory.unregister_listener(listener_name).await.unwrap();

Also, consider that using a global factory makes tests dependent on each other. For better isolation, tests could create their own InternalConnectionFactory instances.

Comment on lines 434 to 435
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using hardcoded SocketAddr with port 0 for internal connections might cause issues with filter chain matching, as some filter chains might match on specific destination ports. This makes the current implementation brittle.

Consider introducing a new variant to DownstreamConnectionMetadata, such as FromInternal, to more accurately represent the origin of the connection and allow for more flexible filter chain matching logic for internal listeners.

Comment on lines 155 to 174
#[tokio::test]
async fn test_concurrent_operations() {
let factory = global_internal_connection_factory();

let listener_name = "test_concurrent_listener_global";
let (_handle, mut connection_receiver, _listener_ref) =
factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener");

let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None);

let connection_future = connector.connect();
let listener_future = connection_receiver.recv();

let (connection_result, listener_result) = tokio::join!(connection_future, listener_future);

assert!(connection_result.is_ok(), "Connection failed: {:?}", connection_result.err());
assert!(listener_result.is_some(), "Listener didn't receive connection");

factory.unregister_listener(listener_name).await.expect("Failed to unregister listener");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test test_concurrent_operations appears to be redundant as its logic is already covered by test_complete_internal_connection_flow. Both tests set up a listener and a connector, and then concurrently connect and receive. To improve maintainability and avoid duplicated test code, consider removing this test.

@Eeshu-Yadav Eeshu-Yadav force-pushed the feat/internal-connection-factory branch 5 times, most recently from de26d09 to 84642f8 Compare September 28, 2025 09:37
- InternalConnectionFactory with thread-safe listener registry
- InternalListenerHandle for connection management and lifecycle
- InternalClusterConnector for cluster-side connections
- InternalChannelConnector with cluster context
- Global singleton factory accessible across the codebase
- Comprehensive connection metadata and statistics

Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
@Eeshu-Yadav Eeshu-Yadav force-pushed the feat/internal-connection-factory branch from 84642f8 to 7574a84 Compare September 28, 2025 09:54
Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this, we should always write a proposal. Otherwise quite tricky to understand

FYI https://github.com/kubernetes/enhancements/tree/master

Self::FromSocket { local_address, .. } => *local_address,
Self::FromProxyProtocol { original_destination_address, .. } => *original_destination_address,
Self::FromTlv { original_destination_address, .. } => *original_destination_address,
Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use :Ipv4Addr::LOCALHOST here

In order to make it easy to understand, i think you should first write a proposal following https://github.com/kmesh-net/kmesh/blob/main/docs/proposal/template.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced the LOCALHOST:0 approach with a special reserved address range

},
};

info!("Internal listener '{}' registered with connection factory", name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
info!("Internal listener '{}' registered with connection factory", name);
info!("Internal listener '{}' registered to connection factory", name);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated that

let filter_chains_clone = filter_chains.clone();
let listener_name = name.to_string();

tokio::spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is costy if the short connections number is very large, could we use a pool here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated that


let downstream_metadata = DownstreamConnectionMetadata::FromInternal {
listener_name: listener_name.clone(),
endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not quite understand the fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields identify internal connections:

  • listener_name: Which internal listener accepted the connection (e.g., "waypoint_inbound")
  • endpoint_id: Optional ID for load balancing when multiple endpoints exist (e.g., "pod-123")

Since internal connections use in-memory channels (not real sockets), these fields replace traditional IP/port-based identification for routing and filter chain selection.

@Eeshu-Yadav
Copy link
Contributor Author

Before this, we should always write a proposal. Otherwise quite tricky to understand

FYI https://github.com/kubernetes/enhancements/tree/master

okk , will write a proposal for this and in future for next things also

- InternalConnectionWorkerPool with efficient task distribution
- Special address range for internal connections (127.255.255.x)
- Enhanced connection metadata with listener_name and endpoint_id
- Performance optimization replacing tokio::spawn overhead

Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
@Copilot Copilot AI review requested due to automatic review settings October 3, 2025 03:46
@kmesh-bot kmesh-bot added size/XXL and removed size/XL labels Oct 3, 2025
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements internal connection factory functionality for in-process communication in Orion, enabling waypoint proxy capabilities for ambient service mesh deployments. The implementation provides thread-safe listener registry, connection management, and cluster-side connectivity for internal endpoints.

  • Internal connection factory with thread-safe listener registration and lifecycle management
  • Internal cluster connectors for establishing connections to internal listeners
  • Enhanced listener runtime to handle internal connections through filter chains

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
orion-lib/tests/internal_connection_integration.rs Comprehensive integration tests for internal connection factory and cluster connectivity
orion-lib/src/transport/mod.rs Module exports for internal connection and cluster connector types
orion-lib/src/transport/internal_connection.rs Core internal connection factory with thread-safe listener registry and connection handling
orion-lib/src/transport/internal_cluster_connector.rs Cluster-side connectors for establishing connections to internal listeners
orion-lib/src/listeners/listeners_manager.rs Minor import changes with conditional compilation for test configuration
orion-lib/src/listeners/listener.rs Enhanced internal listener runtime with connection handling and worker pool
orion-lib/src/listeners/filter_state.rs Extended downstream connection metadata to support internal connections
orion-lib/src/lib.rs Public API exports for internal connection functionality
docs/proposal/internal-listener-and-upstream-transport.md Design proposal and documentation for internal listener feature

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 245 to 249
_cx: &mut std::task::Context<'_>,
_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
// InternalStreamWrapper is read-only - actual I/O happens in InternalStream
std::task::Poll::Pending
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InternalStreamWrapper AsyncRead implementation always returns Pending, which will cause consumers to hang indefinitely when trying to read. This should either delegate to the underlying stream or return an appropriate error.

Suggested change
_cx: &mut std::task::Context<'_>,
_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
// InternalStreamWrapper is read-only - actual I/O happens in InternalStream
std::task::Poll::Pending
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
// Delegate to the underlying InternalStream's poll_read
// Safety: InternalStreamWrapper is a newtype over Arc<InternalStream>, so we need to get a mutable reference to InternalStream.
// However, Arc does not allow mutable access, so if InternalStream does not support AsyncRead, return an error.
std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"InternalStreamWrapper does not support reading directly",
)))

Copilot uses AI. Check for mistakes.

_cx: &mut std::task::Context<'_>,
_buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
std::task::Poll::Pending
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InternalStreamWrapper AsyncWrite implementation always returns Pending, which will cause consumers to hang indefinitely when trying to write. This should either delegate to the underlying stream or return an appropriate error.

Suggested change
std::task::Poll::Pending
std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"InternalStreamWrapper does not support writing",
)))

Copilot uses AI. Check for mistakes.

}
}
});
drop(worker);
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the spawned worker task handle immediately will cause the worker threads to be orphaned and potentially terminated early. The handles should be stored to ensure proper cleanup.

Copilot uses AI. Check for mistakes.

}

#[derive(Debug, Clone)]
struct InternalListenerConfig {
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer_size_kb field is marked as dead_code but appears to be part of the public API for InternalListenerConfig. Consider implementing its usage or documenting why it's currently unused.

Suggested change
struct InternalListenerConfig {
struct InternalListenerConfig {
/// Optional buffer size in kilobytes for internal listeners.
/// This field is currently unused, but is retained for compatibility with configuration schemas
/// and for potential future use. See issue #TODO for details.

Copilot uses AI. Check for mistakes.

Comment on lines 21 to 23

const INTERNAL_PEER_ADDR: std::net::SocketAddr =
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534));
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using constant values or documented ranges for the internal addresses (127.255.255.254:65534 and 127.255.255.255:65535) to make it clear these are reserved for internal use and avoid potential conflicts.

Suggested change
const INTERNAL_PEER_ADDR: std::net::SocketAddr =
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534));
/// Reserved internal address used as the peer address for in-process (internal) connections.
/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic.
const INTERNAL_PEER_ADDR: std::net::SocketAddr =
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534));
/// Reserved internal address used as the local address for in-process (internal) connections.
/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic.

Copilot uses AI. Check for mistakes.

…ixes

Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
@Eeshu-Yadav Eeshu-Yadav force-pushed the feat/internal-connection-factory branch from 5e5d00b to d32ae90 Compare October 3, 2025 04:32
@Copilot Copilot AI review requested due to automatic review settings October 3, 2025 04:32
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 138 to 139
#[allow(dead_code)]
buffer_size_kb: Option<u32>,
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer_size_kb field is marked as dead code but appears to be part of the configuration structure. Consider either implementing its usage or removing it if it's not needed for the current implementation phase.

Copilot uses AI. Check for mistakes.

Comment on lines 1012 to 1017
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded sleep durations in connection handlers appear to be placeholder implementations. These should be replaced with actual processing logic or removed if they're temporary debugging code.

Suggested change
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");

Copilot uses AI. Check for mistakes.

Comment on lines 1012 to 1017
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded sleep durations in connection handlers appear to be placeholder implementations. These should be replaced with actual processing logic or removed if they're temporary debugging code.

Suggested change
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");

Copilot uses AI. Check for mistakes.

warn!("No matching filter chain found for internal connection");
return Err(crate::Error::new("No matching filter chain"));
};

Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downstream stream is assigned to an unused variable. This suggests incomplete implementation of the connection handling logic. Consider implementing the actual stream processing or adding a TODO comment explaining the planned implementation.

Suggested change
// TODO: Implement actual processing of the downstream stream for internal connections.

Copilot uses AI. Check for mistakes.

}

fn create_internal_connection_pair(metadata: InternalConnectionMetadata) -> (Arc<InternalStream>, Arc<InternalStream>) {
let (upstream_io, downstream_io) = tokio::io::duplex(1024);
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer size is hardcoded to 1024 bytes. Consider making this configurable or using a named constant to make the buffer size more maintainable and tunable.

Copilot uses AI. Check for mistakes.

Copy link
Member

@dawid-nowak dawid-nowak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Eeshu-Yadav , thanks for the valuable work. I think it would be good to keep the internal listener work in a separate crate orion-internal or something like that. Could we move all the internal related structs and logic there ?

filter_chains: Arc<HashMap<FilterChainMatch, FilterchainType>>,
}

impl InternalConnectionWorkerPool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I move all internal connection related tasks to a separate mod/create

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okk, will update that

Moved all internal connection related code (factories, connectors, streams, tests)
from orion-lib to dedicated orion-internal crate.

Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
@Copilot Copilot AI review requested due to automatic review settings October 5, 2025 12:19
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 20 out of 21 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (2)

orion-lib/src/listeners/listener.rs:1

  • Improved closure usage by using function pointer std::vec::Vec::len instead of closure |v| v.len() for better performance and readability.
// Copyright 2025 The kmesh Authors

orion-configuration/src/config/cluster.rs:1

  • Updated panic message to use string interpolation for better readability and consistency with modern Rust formatting practices.
// Copyright 2025 The kmesh Authors

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@Eeshu-Yadav Eeshu-Yadav force-pushed the feat/internal-connection-factory branch from c1d8e9c to 95eddc9 Compare October 5, 2025 12:24
@Copilot Copilot AI review requested due to automatic review settings October 5, 2025 12:44
@Eeshu-Yadav Eeshu-Yadav force-pushed the feat/internal-connection-factory branch from 95eddc9 to 7535361 Compare October 5, 2025 12:44
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 20 out of 21 changed files in this pull request and generated 3 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 114 to 115
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool {
INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static worker pool is hardcoded to 4 workers. Consider making this configurable through runtime configuration to allow tuning based on workload characteristics.

Suggested change
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool {
INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default
/// Initialize the internal worker pool with a configurable number of workers.
/// This should be called once at startup, before any calls to `get_internal_worker_pool`.
pub fn init_internal_worker_pool(num_workers: Option<usize>) {
let workers = num_workers.or_else(|| {
std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
}).unwrap_or(4); // Default to 4 if not specified
let _ = INTERNAL_WORKER_POOL.set(InternalConnectionWorkerPool::new(workers));
}
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool {
INTERNAL_WORKER_POOL.get_or_init(|| {
let workers = std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(4);
InternalConnectionWorkerPool::new(workers)
})

Copilot uses AI. Check for mistakes.

Comment on lines 1006 to 1023
// TODO: Implement actual processing of the downstream stream for internal connections.
// The downstream stream should be passed to the filter chain handler for proper processing.
let _downstream_stream = connection_pair.downstream;

match &filter_chain.handler {
crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => {
info!("Processing internal connection through HTTP filter chain");
// TODO: Implement actual HTTP connection processing through the filter chain
// This should involve setting up HTTP/1.1 or HTTP/2 protocol handling,
// routing requests through the configured route table, and applying filters.
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
// TODO: Implement actual TCP connection processing through the filter chain
// This should involve proxying the TCP connection to the configured upstream
// cluster and handling bidirectional data transfer.
Ok(())
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downstream stream is intentionally unused as indicated by the TODO comments. This represents incomplete functionality that should be tracked and completed.

Suggested change
// TODO: Implement actual processing of the downstream stream for internal connections.
// The downstream stream should be passed to the filter chain handler for proper processing.
let _downstream_stream = connection_pair.downstream;
match &filter_chain.handler {
crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => {
info!("Processing internal connection through HTTP filter chain");
// TODO: Implement actual HTTP connection processing through the filter chain
// This should involve setting up HTTP/1.1 or HTTP/2 protocol handling,
// routing requests through the configured route table, and applying filters.
Ok(())
},
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
// TODO: Implement actual TCP connection processing through the filter chain
// This should involve proxying the TCP connection to the configured upstream
// cluster and handling bidirectional data transfer.
Ok(())
// Pass the downstream stream to the filter chain handler for processing.
let downstream_stream = connection_pair.downstream;
match &filter_chain.handler {
crate::listeners::filterchain::ConnectionHandler::Http(http_manager) => {
info!("Processing internal connection through HTTP filter chain");
// Call the HTTP manager's handler method with the downstream stream and metadata.
// TODO: Replace this placeholder with actual HTTP connection processing logic.
http_manager.handle_internal_connection(downstream_stream, downstream_metadata)
},
crate::listeners::filterchain::ConnectionHandler::Tcp(tcp_proxy) => {
info!("Processing internal connection through TCP filter chain");
// Call the TCP proxy's handler method with the downstream stream and metadata.
// TODO: Replace this placeholder with actual TCP connection processing logic.
tcp_proxy.handle_internal_connection(downstream_stream, downstream_metadata)

Copilot uses AI. Check for mistakes.

}
}

const DEFAULT_BUFFER_SIZE: usize = 1024;
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The default buffer size of 1KB may be too small for high-throughput internal connections. Consider using a larger default or making it configurable.

Suggested change
const DEFAULT_BUFFER_SIZE: usize = 1024;
const DEFAULT_BUFFER_SIZE: usize = 8192;

Copilot uses AI. Check for mistakes.

…, add TODOs

Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
@Eeshu-Yadav Eeshu-Yadav force-pushed the feat/internal-connection-factory branch from 7535361 to 33e8c58 Compare October 5, 2025 13:40
@Eeshu-Yadav
Copy link
Contributor Author

@hzxuzhonghu added the proposal in desired location here . kindly review

@Eeshu-Yadav
Copy link
Contributor Author

@dawid-nowak kindly review this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Internal Connection Factory Implementation

4 participants