-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Add heaptender client to compaction and make finishtask use this #5715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,8 @@ use chroma_error::ChromaError; | |
| use chroma_system::{Component, ComponentContext, Handler}; | ||
| use chroma_tracing::GrpcClientTraceService; | ||
| use chroma_types::chroma_proto::{ | ||
| log_service_client::LogServiceClient, query_executor_client::QueryExecutorClient, | ||
| heap_tender_service_client::HeapTenderServiceClient, log_service_client::LogServiceClient, | ||
| query_executor_client::QueryExecutorClient, | ||
| }; | ||
| use parking_lot::RwLock; | ||
| use std::{ | ||
|
|
@@ -171,6 +172,7 @@ pub struct ClientManager<T> { | |
| connections_per_node: usize, | ||
| connect_timeout_ms: u64, | ||
| request_timeout_ms: u64, | ||
| port: u16, | ||
| old_memberlist: Memberlist, | ||
| options: ClientOptions, | ||
| } | ||
|
|
@@ -184,6 +186,7 @@ where | |
| connections_per_node: usize, | ||
| connect_timeout_ms: u64, | ||
| request_timeout_ms: u64, | ||
| port: u16, | ||
| options: ClientOptions, | ||
| ) -> Self { | ||
| Self { | ||
|
|
@@ -192,6 +195,7 @@ where | |
| connections_per_node, | ||
| connect_timeout_ms, | ||
| request_timeout_ms, | ||
| port, | ||
| old_memberlist: Memberlist::new(), | ||
| options, | ||
| } | ||
|
|
@@ -233,8 +237,7 @@ where | |
| } | ||
|
|
||
| async fn add_ip_for_node(&mut self, ip: String, node: &str) { | ||
| // TODO: Configure the port | ||
| let ip_with_port = format!("http://{}:{}", ip, 50051); | ||
| let ip_with_port = format!("http://{}:{}", ip, self.port); | ||
propel-code-bot[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let endpoint = match Endpoint::from_shared(ip_with_port) { | ||
| Ok(endpoint) => endpoint | ||
| .connect_timeout(std::time::Duration::from_millis(self.connect_timeout_ms)) | ||
|
|
@@ -417,6 +420,17 @@ impl ClientFactory | |
| } | ||
| } | ||
|
|
||
| impl ClientFactory | ||
| for HeapTenderServiceClient<chroma_tracing::GrpcClientTraceService<tonic::transport::Channel>> | ||
| { | ||
| fn new_from_channel(channel: GrpcClientTraceService<Channel>) -> Self { | ||
| HeapTenderServiceClient::new(channel) | ||
| } | ||
| fn max_decoding_message_size(self, max_size: usize) -> Self { | ||
| self.max_decoding_message_size(max_size) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod test { | ||
| use super::super::memberlist_provider::Member; | ||
|
|
@@ -436,6 +450,7 @@ mod test { | |
| 1, | ||
| 1000, | ||
| 1000, | ||
| 50051, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [BestPractice] This test hardcodes the port Context for AgentsThere was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine with it. |
||
| ClientOptions::default(), | ||
| ); | ||
| (client_manager, client_assigner) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| use chroma_memberlist::config::CustomResourceMemberlistProviderConfig; | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| /// Configuration for the gRPC heap service client. | ||
| /// | ||
| /// This configures how to connect to the heap tender service instances via memberlist. | ||
| #[derive(Deserialize, Clone, Serialize, Debug)] | ||
| pub struct GrpcHeapServiceConfig { | ||
| /// Whether the heap service client is enabled. Defaults to false. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_enabled")] | ||
| pub enabled: bool, | ||
| /// Connection timeout in milliseconds. Defaults to 5000ms. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_connect_timeout_ms")] | ||
| pub connect_timeout_ms: u64, | ||
| /// Request timeout in milliseconds. Defaults to 5000ms. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_request_timeout_ms")] | ||
| pub request_timeout_ms: u64, | ||
| /// Maximum message size for encoding. Defaults to 32MB. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_max_encoding_message_size")] | ||
| pub max_encoding_message_size: usize, | ||
| /// Maximum message size for decoding. Defaults to 32MB. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_max_decoding_message_size")] | ||
| pub max_decoding_message_size: usize, | ||
| /// Memberlist provider configuration. Defaults to rust-log-service-memberlist (colocated). | ||
| #[serde(default = "GrpcHeapServiceConfig::default_memberlist_provider")] | ||
| pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig, | ||
| /// Assignment policy. Must match log service (RendezvousHashing + Murmur3) for data locality. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_assignment")] | ||
| pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig, | ||
| /// Port the heap service listens on. Defaults to 50052. | ||
| #[serde(default = "GrpcHeapServiceConfig::default_port")] | ||
| pub port: u16, | ||
| } | ||
|
|
||
| impl GrpcHeapServiceConfig { | ||
| fn default_enabled() -> bool { | ||
| false | ||
| } | ||
|
|
||
| fn default_connect_timeout_ms() -> u64 { | ||
| 5000 | ||
| } | ||
|
|
||
| fn default_request_timeout_ms() -> u64 { | ||
| 5000 | ||
| } | ||
|
|
||
| fn default_max_encoding_message_size() -> usize { | ||
| 32_000_000 | ||
| } | ||
|
|
||
| fn default_max_decoding_message_size() -> usize { | ||
| 32_000_000 | ||
| } | ||
|
|
||
| fn default_memberlist_provider() -> chroma_memberlist::config::MemberlistProviderConfig { | ||
| chroma_memberlist::config::MemberlistProviderConfig::CustomResource( | ||
| CustomResourceMemberlistProviderConfig { | ||
| kube_namespace: "chroma".to_string(), | ||
| memberlist_name: "rust-log-service-memberlist".to_string(), // Colocated with log service | ||
| queue_size: 100, | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| fn default_assignment() -> chroma_config::assignment::config::AssignmentPolicyConfig { | ||
| // IMPORTANT: Must match log service assignment policy (RendezvousHashing + Murmur3) | ||
| // since heap and log services are colocated. This ensures that a collection's | ||
| // heap operations go to the same node as its log operations. | ||
| chroma_config::assignment::config::AssignmentPolicyConfig::RendezvousHashing( | ||
| chroma_config::assignment::config::RendezvousHashingAssignmentPolicyConfig { | ||
| hasher: chroma_config::assignment::config::HasherType::Murmur3, | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| fn default_port() -> u16 { | ||
| 50052 | ||
| } | ||
| } | ||
|
|
||
| impl Default for GrpcHeapServiceConfig { | ||
| fn default() -> Self { | ||
| GrpcHeapServiceConfig { | ||
| enabled: GrpcHeapServiceConfig::default_enabled(), | ||
| connect_timeout_ms: GrpcHeapServiceConfig::default_connect_timeout_ms(), | ||
| request_timeout_ms: GrpcHeapServiceConfig::default_request_timeout_ms(), | ||
| max_encoding_message_size: GrpcHeapServiceConfig::default_max_encoding_message_size(), | ||
| max_decoding_message_size: GrpcHeapServiceConfig::default_max_decoding_message_size(), | ||
| memberlist_provider: GrpcHeapServiceConfig::default_memberlist_provider(), | ||
| assignment: GrpcHeapServiceConfig::default_assignment(), | ||
| port: GrpcHeapServiceConfig::default_port(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Configuration for heap service client. | ||
| #[derive(Deserialize, Clone, Serialize, Debug)] | ||
| pub enum HeapServiceConfig { | ||
| /// gRPC-based heap service configuration. | ||
| #[serde(alias = "grpc")] | ||
| Grpc(GrpcHeapServiceConfig), | ||
| } | ||
|
|
||
| impl Default for HeapServiceConfig { | ||
| fn default() -> Self { | ||
| HeapServiceConfig::Grpc(GrpcHeapServiceConfig::default()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the
http://scheme for plaintext connections. Whilegrpc://may work with some gRPC implementations,http://is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, usehttps://.⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, propel.