Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ k8s_resource(
k8s_resource('postgres', resource_deps=['k8s_setup'], labels=["infrastructure"], port_forwards='5432:5432')
# Jobs are suffixed with the image tag to ensure they are unique. In this context, the image tag is defined in k8s/distributed-chroma/values.yaml.
k8s_resource('sysdb-migration-latest', resource_deps=['postgres'], labels=["infrastructure"])
k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051'], resource_deps=['minio-deployment'])
k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051', '50052:50052'], resource_deps=['minio-deployment'])
k8s_resource('sysdb', resource_deps=['sysdb-migration-latest'], labels=["chroma"], port_forwards='50051:50051')
k8s_resource('rust-frontend-service', resource_deps=['sysdb', 'rust-log-service'], labels=["chroma"], port_forwards='8000:8000')
k8s_resource('query-service', resource_deps=['sysdb'], labels=["chroma"], port_forwards='50053:50051')
Expand Down
7 changes: 7 additions & 0 deletions rust/frontend/src/executor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ fn default_max_query_service_response_size_bytes() -> usize {
1024 * 1024 * 32
}

fn default_query_service_port() -> u16 {
50051
}

/// Configuration for the distributed executor.
/// # Fields
/// - `connections_per_node` - The number of connections to maintain per node
Expand All @@ -22,6 +26,7 @@ fn default_max_query_service_response_size_bytes() -> usize {
/// - `request_timeout_ms` - The timeout for the request
/// - `assignment` - The assignment policy to use for routing requests
/// - `memberlist_provider` - The memberlist provider to use for getting the list of nodes
/// - `port` - The port the query service listens on. Defaults to 50051.
#[derive(Deserialize, Clone, Serialize, Debug)]
pub struct DistributedExecutorConfig {
pub connections_per_node: usize,
Expand All @@ -36,6 +41,8 @@ pub struct DistributedExecutorConfig {
pub max_query_service_response_size_bytes: usize,
#[serde(default = "ClientSelectionConfig::default")]
pub client_selection_config: ClientSelectionConfig,
#[serde(default = "default_query_service_port")]
pub port: u16,
}

#[derive(Deserialize, Clone, Serialize, Debug)]
Expand Down
1 change: 1 addition & 0 deletions rust/frontend/src/executor/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
config.connections_per_node,
config.connect_timeout_ms,
config.request_timeout_ms,
config.port,
ClientOptions::new(Some(config.max_query_service_response_size_bytes)),
);
let client_manager_handle = system.start_component(client_manager);
Expand Down
7 changes: 7 additions & 0 deletions rust/log/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub struct GrpcLogConfig {
pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
#[serde(default = "GrpcLogConfig::default_assignment")]
pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig,
#[serde(default = "GrpcLogConfig::default_port")]
pub port: u16,
}

impl GrpcLogConfig {
Expand Down Expand Up @@ -51,6 +53,10 @@ impl GrpcLogConfig {
},
)
}

fn default_port() -> u16 {
50051
}
}

impl Default for GrpcLogConfig {
Expand All @@ -62,6 +68,7 @@ impl Default for GrpcLogConfig {
max_decoding_message_size: GrpcLogConfig::default_max_decoding_message_size(),
memberlist_provider: GrpcLogConfig::default_memberlist_provider(),
assignment: GrpcLogConfig::default_assignment(),
port: GrpcLogConfig::default_port(),
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl Configurable<(GrpcLogConfig, System)> for GrpcLog {
1,
my_config.connect_timeout_ms,
my_config.request_timeout_ms,
my_config.port,
ClientOptions::new(Some(my_config.max_decoding_message_size)),
);
let client_manager_handle = system.start_component(client_manager);
Expand Down Expand Up @@ -640,13 +641,14 @@ impl GrpcLog {
ordinal: u64,
) -> Result<(), GarbageCollectError> {
// NOTE(rescrv): Use a raw LogServiceClient so we can open by stateful set ordinal.
let port = self.config.port;
let endpoint_res = match Endpoint::from_shared(format!(
"grpc://rust-log-service-{ordinal}.rust-log-service:50051"
"grpc://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {
Comment on lines 645 to 647
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the http:// scheme for plaintext connections. While grpc:// may work with some gRPC implementations, http:// is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, use https://.

Suggested change
let endpoint_res = match Endpoint::from_shared(format!(
"grpc://rust-log-service-{ordinal}.rust-log-service:50051"
"grpc://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {
let endpoint_res = match Endpoint::from_shared(format!(
"http://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the `http://` scheme for plaintext connections. While `grpc://` may work with some gRPC implementations, `http://` is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, use `https://`.

```suggestion
        let endpoint_res = match Endpoint::from_shared(format!(
            "http://rust-log-service-{ordinal}.rust-log-service:{port}"
        )) {
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/log/src/grpc_log.rs
Line: 647

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, propel.

Ok(endpoint) => endpoint,
Err(e) => {
return Err(GarbageCollectError::Resolution(format!(
"could not connect to rust-log-service-{ordinal}:50051: {}",
"could not connect to rust-log-service-{ordinal}:{port}: {}",
e
)));
}
Expand All @@ -656,7 +658,7 @@ impl GrpcLog {
.timeout(Duration::from_millis(self.config.request_timeout_ms));
let channel = endpoint_res.connect().await.map_err(|err| {
GarbageCollectError::Resolution(format!(
"could not connect to rust-log-service-{ordinal}:50051: {}",
"could not connect to rust-log-service-{ordinal}:{port}: {}",
err
))
})?;
Expand Down
21 changes: 18 additions & 3 deletions rust/memberlist/src/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use chroma_error::ChromaError;
use chroma_system::{Component, ComponentContext, Handler};
use chroma_tracing::GrpcClientTraceService;
use chroma_types::chroma_proto::{
log_service_client::LogServiceClient, query_executor_client::QueryExecutorClient,
heap_tender_service_client::HeapTenderServiceClient, log_service_client::LogServiceClient,
query_executor_client::QueryExecutorClient,
};
use parking_lot::RwLock;
use std::{
Expand Down Expand Up @@ -171,6 +172,7 @@ pub struct ClientManager<T> {
connections_per_node: usize,
connect_timeout_ms: u64,
request_timeout_ms: u64,
port: u16,
old_memberlist: Memberlist,
options: ClientOptions,
}
Expand All @@ -184,6 +186,7 @@ where
connections_per_node: usize,
connect_timeout_ms: u64,
request_timeout_ms: u64,
port: u16,
options: ClientOptions,
) -> Self {
Self {
Expand All @@ -192,6 +195,7 @@ where
connections_per_node,
connect_timeout_ms,
request_timeout_ms,
port,
old_memberlist: Memberlist::new(),
options,
}
Expand Down Expand Up @@ -233,8 +237,7 @@ where
}

async fn add_ip_for_node(&mut self, ip: String, node: &str) {
// TODO: Configure the port
let ip_with_port = format!("http://{}:{}", ip, 50051);
let ip_with_port = format!("http://{}:{}", ip, self.port);
let endpoint = match Endpoint::from_shared(ip_with_port) {
Ok(endpoint) => endpoint
.connect_timeout(std::time::Duration::from_millis(self.connect_timeout_ms))
Expand Down Expand Up @@ -417,6 +420,17 @@ impl ClientFactory
}
}

impl ClientFactory
for HeapTenderServiceClient<chroma_tracing::GrpcClientTraceService<tonic::transport::Channel>>
{
fn new_from_channel(channel: GrpcClientTraceService<Channel>) -> Self {
HeapTenderServiceClient::new(channel)
}
fn max_decoding_message_size(self, max_size: usize) -> Self {
self.max_decoding_message_size(max_size)
}
}

#[cfg(test)]
mod test {
use super::super::memberlist_provider::Member;
Expand All @@ -436,6 +450,7 @@ mod test {
1,
1000,
1000,
50051,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

This test hardcodes the port 50051. While this is a test, it's good practice to use a constant or a test configuration setup to avoid magic numbers, making the test easier to understand and maintain.

Context for Agents
[**BestPractice**]

This test hardcodes the port `50051`. While this is a test, it's good practice to use a constant or a test configuration setup to avoid magic numbers, making the test easier to understand and maintain.

File: rust/memberlist/src/client_manager.rs
Line: 453

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it.

ClientOptions::default(),
);
(client_manager, client_assigner)
Expand Down
3 changes: 3 additions & 0 deletions rust/s3heap-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ futures = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tonic-health = { workspace = true }
Expand All @@ -19,8 +20,10 @@ tracing = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-memberlist = { workspace = true }
chroma-storage = { workspace = true }
chroma-sysdb = { workspace = true }
chroma-system = { workspace = true }
chroma-tracing = { workspace = true, features = ["grpc"] }
chroma-types = { workspace = true }
s3heap = { workspace = true }
Expand Down
109 changes: 109 additions & 0 deletions rust/s3heap-service/src/client/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use chroma_memberlist::config::CustomResourceMemberlistProviderConfig;
use serde::{Deserialize, Serialize};

/// Configuration for the gRPC heap service client.
///
/// This configures how to connect to the heap tender service instances via memberlist.
#[derive(Deserialize, Clone, Serialize, Debug)]
pub struct GrpcHeapServiceConfig {
/// Whether the heap service client is enabled. Defaults to false.
#[serde(default = "GrpcHeapServiceConfig::default_enabled")]
pub enabled: bool,
/// Connection timeout in milliseconds. Defaults to 5000ms.
#[serde(default = "GrpcHeapServiceConfig::default_connect_timeout_ms")]
pub connect_timeout_ms: u64,
/// Request timeout in milliseconds. Defaults to 5000ms.
#[serde(default = "GrpcHeapServiceConfig::default_request_timeout_ms")]
pub request_timeout_ms: u64,
/// Maximum message size for encoding. Defaults to 32MB.
#[serde(default = "GrpcHeapServiceConfig::default_max_encoding_message_size")]
pub max_encoding_message_size: usize,
/// Maximum message size for decoding. Defaults to 32MB.
#[serde(default = "GrpcHeapServiceConfig::default_max_decoding_message_size")]
pub max_decoding_message_size: usize,
/// Memberlist provider configuration. Defaults to rust-log-service-memberlist (colocated).
#[serde(default = "GrpcHeapServiceConfig::default_memberlist_provider")]
pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
/// Assignment policy. Must match log service (RendezvousHashing + Murmur3) for data locality.
#[serde(default = "GrpcHeapServiceConfig::default_assignment")]
pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig,
/// Port the heap service listens on. Defaults to 50052.
#[serde(default = "GrpcHeapServiceConfig::default_port")]
pub port: u16,
}

impl GrpcHeapServiceConfig {
fn default_enabled() -> bool {
false
}

fn default_connect_timeout_ms() -> u64 {
5000
}

fn default_request_timeout_ms() -> u64 {
5000
}

fn default_max_encoding_message_size() -> usize {
32_000_000
}

fn default_max_decoding_message_size() -> usize {
32_000_000
}

fn default_memberlist_provider() -> chroma_memberlist::config::MemberlistProviderConfig {
chroma_memberlist::config::MemberlistProviderConfig::CustomResource(
CustomResourceMemberlistProviderConfig {
kube_namespace: "chroma".to_string(),
memberlist_name: "rust-log-service-memberlist".to_string(), // Colocated with log service
queue_size: 100,
},
)
}

fn default_assignment() -> chroma_config::assignment::config::AssignmentPolicyConfig {
// IMPORTANT: Must match log service assignment policy (RendezvousHashing + Murmur3)
// since heap and log services are colocated. This ensures that a collection's
// heap operations go to the same node as its log operations.
chroma_config::assignment::config::AssignmentPolicyConfig::RendezvousHashing(
chroma_config::assignment::config::RendezvousHashingAssignmentPolicyConfig {
hasher: chroma_config::assignment::config::HasherType::Murmur3,
},
)
}

fn default_port() -> u16 {
50052
}
}

impl Default for GrpcHeapServiceConfig {
fn default() -> Self {
GrpcHeapServiceConfig {
enabled: GrpcHeapServiceConfig::default_enabled(),
connect_timeout_ms: GrpcHeapServiceConfig::default_connect_timeout_ms(),
request_timeout_ms: GrpcHeapServiceConfig::default_request_timeout_ms(),
max_encoding_message_size: GrpcHeapServiceConfig::default_max_encoding_message_size(),
max_decoding_message_size: GrpcHeapServiceConfig::default_max_decoding_message_size(),
memberlist_provider: GrpcHeapServiceConfig::default_memberlist_provider(),
assignment: GrpcHeapServiceConfig::default_assignment(),
port: GrpcHeapServiceConfig::default_port(),
}
}
}

/// Configuration for heap service client.
#[derive(Deserialize, Clone, Serialize, Debug)]
pub enum HeapServiceConfig {
/// gRPC-based heap service configuration.
#[serde(alias = "grpc")]
Grpc(GrpcHeapServiceConfig),
}

impl Default for HeapServiceConfig {
fn default() -> Self {
HeapServiceConfig::Grpc(GrpcHeapServiceConfig::default())
}
}
Loading