-
Notifications
You must be signed in to change notification settings - Fork 18
feat: implement internal connection factory for in-process communication #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: implement internal connection factory for in-process communication #112
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Summary of ChangesHello @Eeshu-Yadav, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a foundational internal communication system within the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a foundational implementation for an internal connection factory, enabling in-process communication between different parts of the application. The changes include new components for creating and managing internal connections, listeners, and connectors, along with integration into the existing listener management logic. While the overall structure is sound, I've identified a critical bug in the listener lifecycle management that would prevent the feature from working as intended. Additionally, there are some missing implementations for data transfer and areas for improvement in the tests. My review provides specific feedback and suggestions to address these points.
orion-lib/src/listeners/listener.rs
Outdated
|
||
info!("Internal listener '{}' registered with connection factory", name); | ||
|
||
let _listener_ref = std::sync::Arc::new(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line introduces a critical bug. It shadows the _listener_ref
returned from factory.register_listener
, causing the original Arc
to be dropped. As a result, the Weak
reference in the InternalListenerHandle
will see a strong count of 0, and is_alive()
will always return false
. This effectively makes the listener unreachable for new connections immediately after registration.
Please remove this line. The _listener_ref
from register_listener
on line 349 must be kept in scope for the lifetime of the listener task to correctly manage its lifecycle.
impl AsyncRead for InternalStream { | ||
fn poll_read( | ||
self: std::pin::Pin<&mut Self>, | ||
_cx: &mut std::task::Context<'_>, | ||
_buf: &mut tokio::io::ReadBuf<'_>, | ||
) -> std::task::Poll<std::io::Result<()>> { | ||
std::task::Poll::Pending | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AsyncRead
and AsyncWrite
implementations for InternalStream
(and InternalStreamWrapper
) are currently placeholders that always return Poll::Pending
. This prevents any actual data transfer over the internal connections, making the feature fundamentally incomplete.
To make this functional, you should implement proper in-memory stream logic. A good approach would be to use tokio::io::duplex
to create a pair of connected in-memory byte streams for each internal connection.
assert_eq!(stats.max_pooled_connections, 0); | ||
|
||
let listeners = list_internal_listeners().await; | ||
assert!(listeners.is_empty() || !listeners.is_empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion assert!(listeners.is_empty() || !listeners.is_empty());
is a tautology and will always pass, which means this part of the test is not actually verifying anything.
A more meaningful test would involve registering a listener and then asserting that list_internal_listeners()
contains its name. For example:
let factory = global_internal_connection_factory();
let listener_name = "my-test-listener";
let (_h, _r, _g) = factory.register_listener(listener_name.to_string()).await.unwrap();
let listeners = list_internal_listeners().await;
assert!(listeners.contains(&listener_name.to_string()));
factory.unregister_listener(listener_name).await.unwrap();
Also, consider that using a global factory makes tests dependent on each other. For better isolation, tests could create their own InternalConnectionFactory
instances.
orion-lib/src/listeners/listener.rs
Outdated
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); | ||
let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using hardcoded SocketAddr
with port 0 for internal connections might cause issues with filter chain matching, as some filter chains might match on specific destination ports. This makes the current implementation brittle.
Consider introducing a new variant to DownstreamConnectionMetadata
, such as FromInternal
, to more accurately represent the origin of the connection and allow for more flexible filter chain matching logic for internal listeners.
#[tokio::test] | ||
async fn test_concurrent_operations() { | ||
let factory = global_internal_connection_factory(); | ||
|
||
let listener_name = "test_concurrent_listener_global"; | ||
let (_handle, mut connection_receiver, _listener_ref) = | ||
factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener"); | ||
|
||
let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None); | ||
|
||
let connection_future = connector.connect(); | ||
let listener_future = connection_receiver.recv(); | ||
|
||
let (connection_result, listener_result) = tokio::join!(connection_future, listener_future); | ||
|
||
assert!(connection_result.is_ok(), "Connection failed: {:?}", connection_result.err()); | ||
assert!(listener_result.is_some(), "Listener didn't receive connection"); | ||
|
||
factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test test_concurrent_operations
appears to be redundant as its logic is already covered by test_complete_internal_connection_flow
. Both tests set up a listener and a connector, and then concurrently connect and receive. To improve maintainability and avoid duplicated test code, consider removing this test.
de26d09
to
84642f8
Compare
- InternalConnectionFactory with thread-safe listener registry - InternalListenerHandle for connection management and lifecycle - InternalClusterConnector for cluster-side connections - InternalChannelConnector with cluster context - Global singleton factory accessible across the codebase - Comprehensive connection metadata and statistics Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
84642f8
to
7574a84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this, we should always write a proposal. Otherwise quite tricky to understand
Self::FromSocket { local_address, .. } => *local_address, | ||
Self::FromProxyProtocol { original_destination_address, .. } => *original_destination_address, | ||
Self::FromTlv { original_destination_address, .. } => *original_destination_address, | ||
Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use :Ipv4Addr::LOCALHOST here
In order to make it easy to understand, i think you should first write a proposal following https://github.com/kmesh-net/kmesh/blob/main/docs/proposal/template.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced the LOCALHOST:0
approach with a special reserved address range
orion-lib/src/listeners/listener.rs
Outdated
}, | ||
}; | ||
|
||
info!("Internal listener '{}' registered with connection factory", name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info!("Internal listener '{}' registered with connection factory", name); | |
info!("Internal listener '{}' registered to connection factory", name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated that
orion-lib/src/listeners/listener.rs
Outdated
let filter_chains_clone = filter_chains.clone(); | ||
let listener_name = name.to_string(); | ||
|
||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is costy if the short connections number is very large, could we use a pool here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated that
orion-lib/src/listeners/listener.rs
Outdated
|
||
let downstream_metadata = DownstreamConnectionMetadata::FromInternal { | ||
listener_name: listener_name.clone(), | ||
endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not quite understand the fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fields identify internal connections:
listener_name
: Which internal listener accepted the connection (e.g., "waypoint_inbound")- endpoint_id: Optional ID for load balancing when multiple endpoints exist (e.g., "pod-123")
Since internal connections use in-memory channels (not real sockets), these fields replace traditional IP/port-based identification for routing and filter chain selection.
okk , will write a proposal for this and in future for next things also |
- InternalConnectionWorkerPool with efficient task distribution - Special address range for internal connections (127.255.255.x) - Enhanced connection metadata with listener_name and endpoint_id - Performance optimization replacing tokio::spawn overhead Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements internal connection factory functionality for in-process communication in Orion, enabling waypoint proxy capabilities for ambient service mesh deployments. The implementation provides thread-safe listener registry, connection management, and cluster-side connectivity for internal endpoints.
- Internal connection factory with thread-safe listener registration and lifecycle management
- Internal cluster connectors for establishing connections to internal listeners
- Enhanced listener runtime to handle internal connections through filter chains
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
orion-lib/tests/internal_connection_integration.rs | Comprehensive integration tests for internal connection factory and cluster connectivity |
orion-lib/src/transport/mod.rs | Module exports for internal connection and cluster connector types |
orion-lib/src/transport/internal_connection.rs | Core internal connection factory with thread-safe listener registry and connection handling |
orion-lib/src/transport/internal_cluster_connector.rs | Cluster-side connectors for establishing connections to internal listeners |
orion-lib/src/listeners/listeners_manager.rs | Minor import changes with conditional compilation for test configuration |
orion-lib/src/listeners/listener.rs | Enhanced internal listener runtime with connection handling and worker pool |
orion-lib/src/listeners/filter_state.rs | Extended downstream connection metadata to support internal connections |
orion-lib/src/lib.rs | Public API exports for internal connection functionality |
docs/proposal/internal-listener-and-upstream-transport.md | Design proposal and documentation for internal listener feature |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
_cx: &mut std::task::Context<'_>, | ||
_buf: &mut tokio::io::ReadBuf<'_>, | ||
) -> std::task::Poll<std::io::Result<()>> { | ||
// InternalStreamWrapper is read-only - actual I/O happens in InternalStream | ||
std::task::Poll::Pending |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InternalStreamWrapper AsyncRead implementation always returns Pending, which will cause consumers to hang indefinitely when trying to read. This should either delegate to the underlying stream or return an appropriate error.
_cx: &mut std::task::Context<'_>, | |
_buf: &mut tokio::io::ReadBuf<'_>, | |
) -> std::task::Poll<std::io::Result<()>> { | |
// InternalStreamWrapper is read-only - actual I/O happens in InternalStream | |
std::task::Poll::Pending | |
cx: &mut std::task::Context<'_>, | |
buf: &mut tokio::io::ReadBuf<'_>, | |
) -> std::task::Poll<std::io::Result<()>> { | |
// Delegate to the underlying InternalStream's poll_read | |
// Safety: InternalStreamWrapper is a newtype over Arc<InternalStream>, so we need to get a mutable reference to InternalStream. | |
// However, Arc does not allow mutable access, so if InternalStream does not support AsyncRead, return an error. | |
std::task::Poll::Ready(Err(std::io::Error::new( | |
std::io::ErrorKind::Unsupported, | |
"InternalStreamWrapper does not support reading directly", | |
))) |
Copilot uses AI. Check for mistakes.
_cx: &mut std::task::Context<'_>, | ||
_buf: &[u8], | ||
) -> std::task::Poll<std::io::Result<usize>> { | ||
std::task::Poll::Pending |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InternalStreamWrapper AsyncWrite implementation always returns Pending, which will cause consumers to hang indefinitely when trying to write. This should either delegate to the underlying stream or return an appropriate error.
std::task::Poll::Pending | |
std::task::Poll::Ready(Err(std::io::Error::new( | |
std::io::ErrorKind::Unsupported, | |
"InternalStreamWrapper does not support writing", | |
))) |
Copilot uses AI. Check for mistakes.
orion-lib/src/listeners/listener.rs
Outdated
} | ||
} | ||
}); | ||
drop(worker); |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping the spawned worker task handle immediately will cause the worker threads to be orphaned and potentially terminated early. The handles should be stored to ensure proper cleanup.
Copilot uses AI. Check for mistakes.
} | ||
|
||
#[derive(Debug, Clone)] | ||
struct InternalListenerConfig { |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer_size_kb field is marked as dead_code but appears to be part of the public API for InternalListenerConfig. Consider implementing its usage or documenting why it's currently unused.
struct InternalListenerConfig { | |
struct InternalListenerConfig { | |
/// Optional buffer size in kilobytes for internal listeners. | |
/// This field is currently unused, but is retained for compatibility with configuration schemas | |
/// and for potential future use. See issue #TODO for details. |
Copilot uses AI. Check for mistakes.
|
||
const INTERNAL_PEER_ADDR: std::net::SocketAddr = | ||
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using constant values or documented ranges for the internal addresses (127.255.255.254:65534 and 127.255.255.255:65535) to make it clear these are reserved for internal use and avoid potential conflicts.
const INTERNAL_PEER_ADDR: std::net::SocketAddr = | |
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); | |
/// Reserved internal address used as the peer address for in-process (internal) connections. | |
/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic. | |
const INTERNAL_PEER_ADDR: std::net::SocketAddr = | |
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); | |
/// Reserved internal address used as the local address for in-process (internal) connections. | |
/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic. |
Copilot uses AI. Check for mistakes.
…ixes Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
5e5d00b
to
d32ae90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
orion-lib/src/listeners/listener.rs
Outdated
#[allow(dead_code)] | ||
buffer_size_kb: Option<u32>, |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer_size_kb
field is marked as dead code but appears to be part of the configuration structure. Consider either implementing its usage or removing it if it's not needed for the current implementation phase.
Copilot uses AI. Check for mistakes.
orion-lib/src/listeners/listener.rs
Outdated
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
Ok(()) | ||
}, | ||
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | ||
info!("Processing internal connection through TCP filter chain"); | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded sleep durations in connection handlers appear to be placeholder implementations. These should be replaced with actual processing logic or removed if they're temporary debugging code.
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
Ok(()) | |
}, | |
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | |
info!("Processing internal connection through TCP filter chain"); | |
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
Ok(()) | |
}, | |
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | |
info!("Processing internal connection through TCP filter chain"); |
Copilot uses AI. Check for mistakes.
orion-lib/src/listeners/listener.rs
Outdated
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
Ok(()) | ||
}, | ||
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | ||
info!("Processing internal connection through TCP filter chain"); | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded sleep durations in connection handlers appear to be placeholder implementations. These should be replaced with actual processing logic or removed if they're temporary debugging code.
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
Ok(()) | |
}, | |
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | |
info!("Processing internal connection through TCP filter chain"); | |
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
Ok(()) | |
}, | |
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | |
info!("Processing internal connection through TCP filter chain"); |
Copilot uses AI. Check for mistakes.
warn!("No matching filter chain found for internal connection"); | ||
return Err(crate::Error::new("No matching filter chain")); | ||
}; | ||
|
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downstream stream is assigned to an unused variable. This suggests incomplete implementation of the connection handling logic. Consider implementing the actual stream processing or adding a TODO comment explaining the planned implementation.
// TODO: Implement actual processing of the downstream stream for internal connections. |
Copilot uses AI. Check for mistakes.
orion-internal/src/connection.rs
Outdated
} | ||
|
||
fn create_internal_connection_pair(metadata: InternalConnectionMetadata) -> (Arc<InternalStream>, Arc<InternalStream>) { | ||
let (upstream_io, downstream_io) = tokio::io::duplex(1024); |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer size is hardcoded to 1024 bytes. Consider making this configurable or using a named constant to make the buffer size more maintainable and tunable.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Eeshu-Yadav , thanks for the valuable work. I think it would be good to keep the internal listener work in a separate crate orion-internal or something like that. Could we move all the internal related structs and logic there ?
filter_chains: Arc<HashMap<FilterChainMatch, FilterchainType>>, | ||
} | ||
|
||
impl InternalConnectionWorkerPool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I move all internal connection related tasks to a separate mod/create
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okk, will update that
Moved all internal connection related code (factories, connectors, streams, tests) from orion-lib to dedicated orion-internal crate. Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 20 out of 21 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (2)
orion-lib/src/listeners/listener.rs:1
- Improved closure usage by using function pointer
std::vec::Vec::len
instead of closure|v| v.len()
for better performance and readability.
// Copyright 2025 The kmesh Authors
orion-configuration/src/config/cluster.rs:1
- Updated panic message to use string interpolation for better readability and consistency with modern Rust formatting practices.
// Copyright 2025 The kmesh Authors
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
c1d8e9c
to
95eddc9
Compare
95eddc9
to
7535361
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 20 out of 21 changed files in this pull request and generated 3 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
orion-lib/src/listeners/listener.rs
Outdated
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool { | ||
INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default |
Copilot
AI
Oct 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static worker pool is hardcoded to 4 workers. Consider making this configurable through runtime configuration to allow tuning based on workload characteristics.
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool { | |
INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default | |
/// Initialize the internal worker pool with a configurable number of workers. | |
/// This should be called once at startup, before any calls to `get_internal_worker_pool`. | |
pub fn init_internal_worker_pool(num_workers: Option<usize>) { | |
let workers = num_workers.or_else(|| { | |
std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE") | |
.ok() | |
.and_then(|s| s.parse::<usize>().ok()) | |
}).unwrap_or(4); // Default to 4 if not specified | |
let _ = INTERNAL_WORKER_POOL.set(InternalConnectionWorkerPool::new(workers)); | |
} | |
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool { | |
INTERNAL_WORKER_POOL.get_or_init(|| { | |
let workers = std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE") | |
.ok() | |
.and_then(|s| s.parse::<usize>().ok()) | |
.unwrap_or(4); | |
InternalConnectionWorkerPool::new(workers) | |
}) |
Copilot uses AI. Check for mistakes.
orion-lib/src/listeners/listener.rs
Outdated
// TODO: Implement actual processing of the downstream stream for internal connections. | ||
// The downstream stream should be passed to the filter chain handler for proper processing. | ||
let _downstream_stream = connection_pair.downstream; | ||
|
||
match &filter_chain.handler { | ||
crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { | ||
info!("Processing internal connection through HTTP filter chain"); | ||
// TODO: Implement actual HTTP connection processing through the filter chain | ||
// This should involve setting up HTTP/1.1 or HTTP/2 protocol handling, | ||
// routing requests through the configured route table, and applying filters. | ||
Ok(()) | ||
}, | ||
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | ||
info!("Processing internal connection through TCP filter chain"); | ||
// TODO: Implement actual TCP connection processing through the filter chain | ||
// This should involve proxying the TCP connection to the configured upstream | ||
// cluster and handling bidirectional data transfer. | ||
Ok(()) |
Copilot
AI
Oct 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downstream stream is intentionally unused as indicated by the TODO comments. This represents incomplete functionality that should be tracked and completed.
// TODO: Implement actual processing of the downstream stream for internal connections. | |
// The downstream stream should be passed to the filter chain handler for proper processing. | |
let _downstream_stream = connection_pair.downstream; | |
match &filter_chain.handler { | |
crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { | |
info!("Processing internal connection through HTTP filter chain"); | |
// TODO: Implement actual HTTP connection processing through the filter chain | |
// This should involve setting up HTTP/1.1 or HTTP/2 protocol handling, | |
// routing requests through the configured route table, and applying filters. | |
Ok(()) | |
}, | |
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { | |
info!("Processing internal connection through TCP filter chain"); | |
// TODO: Implement actual TCP connection processing through the filter chain | |
// This should involve proxying the TCP connection to the configured upstream | |
// cluster and handling bidirectional data transfer. | |
Ok(()) | |
// Pass the downstream stream to the filter chain handler for processing. | |
let downstream_stream = connection_pair.downstream; | |
match &filter_chain.handler { | |
crate::listeners::filterchain::ConnectionHandler::Http(http_manager) => { | |
info!("Processing internal connection through HTTP filter chain"); | |
// Call the HTTP manager's handler method with the downstream stream and metadata. | |
// TODO: Replace this placeholder with actual HTTP connection processing logic. | |
http_manager.handle_internal_connection(downstream_stream, downstream_metadata) | |
}, | |
crate::listeners::filterchain::ConnectionHandler::Tcp(tcp_proxy) => { | |
info!("Processing internal connection through TCP filter chain"); | |
// Call the TCP proxy's handler method with the downstream stream and metadata. | |
// TODO: Replace this placeholder with actual TCP connection processing logic. | |
tcp_proxy.handle_internal_connection(downstream_stream, downstream_metadata) |
Copilot uses AI. Check for mistakes.
orion-internal/src/connection.rs
Outdated
} | ||
} | ||
|
||
const DEFAULT_BUFFER_SIZE: usize = 1024; |
Copilot
AI
Oct 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The default buffer size of 1KB may be too small for high-throughput internal connections. Consider using a larger default or making it configurable.
const DEFAULT_BUFFER_SIZE: usize = 1024; | |
const DEFAULT_BUFFER_SIZE: usize = 8192; |
Copilot uses AI. Check for mistakes.
…, add TODOs Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
7535361
to
33e8c58
Compare
@hzxuzhonghu added the proposal in desired location here . kindly review |
@dawid-nowak kindly review this |
fixes : Internal Connection Factory Implementation #105 , Enhanced Internal Listener Runtime #106