Skip to content

Commit 34d705e

Browse files
committed
[ENH]: Add heaptender client to compaction and make finishtask use this
1 parent d525f8f commit 34d705e

File tree

13 files changed

+510
-11
lines changed

13 files changed

+510
-11
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/frontend/src/executor/distributed.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
7474
config.connections_per_node,
7575
config.connect_timeout_ms,
7676
config.request_timeout_ms,
77+
50051,
7778
ClientOptions::new(Some(config.max_query_service_response_size_bytes)),
7879
);
7980
let client_manager_handle = system.start_component(client_manager);

rust/log/src/grpc_log.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ impl Configurable<(GrpcLogConfig, System)> for GrpcLog {
263263
1,
264264
my_config.connect_timeout_ms,
265265
my_config.request_timeout_ms,
266+
50051,
266267
ClientOptions::new(Some(my_config.max_decoding_message_size)),
267268
);
268269
let client_manager_handle = system.start_component(client_manager);

rust/memberlist/src/client_manager.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use chroma_error::ChromaError;
77
use chroma_system::{Component, ComponentContext, Handler};
88
use chroma_tracing::GrpcClientTraceService;
99
use chroma_types::chroma_proto::{
10-
log_service_client::LogServiceClient, query_executor_client::QueryExecutorClient,
10+
heap_tender_service_client::HeapTenderServiceClient, log_service_client::LogServiceClient,
11+
query_executor_client::QueryExecutorClient,
1112
};
1213
use parking_lot::RwLock;
1314
use std::{
@@ -171,6 +172,7 @@ pub struct ClientManager<T> {
171172
connections_per_node: usize,
172173
connect_timeout_ms: u64,
173174
request_timeout_ms: u64,
175+
port: u16,
174176
old_memberlist: Memberlist,
175177
options: ClientOptions,
176178
}
@@ -184,6 +186,7 @@ where
184186
connections_per_node: usize,
185187
connect_timeout_ms: u64,
186188
request_timeout_ms: u64,
189+
port: u16,
187190
options: ClientOptions,
188191
) -> Self {
189192
Self {
@@ -192,6 +195,7 @@ where
192195
connections_per_node,
193196
connect_timeout_ms,
194197
request_timeout_ms,
198+
port,
195199
old_memberlist: Memberlist::new(),
196200
options,
197201
}
@@ -233,8 +237,7 @@ where
233237
}
234238

235239
async fn add_ip_for_node(&mut self, ip: String, node: &str) {
236-
// TODO: Configure the port
237-
let ip_with_port = format!("http://{}:{}", ip, 50051);
240+
let ip_with_port = format!("http://{}:{}", ip, self.port);
238241
let endpoint = match Endpoint::from_shared(ip_with_port) {
239242
Ok(endpoint) => endpoint
240243
.connect_timeout(std::time::Duration::from_millis(self.connect_timeout_ms))
@@ -417,6 +420,17 @@ impl ClientFactory
417420
}
418421
}
419422

423+
impl ClientFactory
424+
for HeapTenderServiceClient<chroma_tracing::GrpcClientTraceService<tonic::transport::Channel>>
425+
{
426+
fn new_from_channel(channel: GrpcClientTraceService<Channel>) -> Self {
427+
HeapTenderServiceClient::new(channel)
428+
}
429+
fn max_decoding_message_size(self, max_size: usize) -> Self {
430+
self.max_decoding_message_size(max_size)
431+
}
432+
}
433+
420434
#[cfg(test)]
421435
mod test {
422436
use super::super::memberlist_provider::Member;
@@ -436,6 +450,7 @@ mod test {
436450
1,
437451
1000,
438452
1000,
453+
50051,
439454
ClientOptions::default(),
440455
);
441456
(client_manager, client_assigner)

rust/s3heap/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,18 @@ parquet = { workspace = true }
1414
serde = { workspace = true }
1515
serde_json = { workspace = true }
1616
thiserror = { workspace = true }
17+
tonic = { workspace = true }
18+
tower = { workspace = true }
19+
tracing = { workspace = true }
1720
uuid = { workspace = true }
1821

22+
chroma-config = { workspace = true }
1923
chroma-error = { workspace = true }
24+
chroma-memberlist = { workspace = true }
2025
chroma-storage = { workspace = true }
26+
chroma-system = { workspace = true }
27+
chroma-tracing = { workspace = true }
28+
chroma-types = { workspace = true }
2129
wal3 = { workspace = true }
2230

2331
[dev-dependencies]

rust/s3heap/src/client/config.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use chroma_memberlist::config::CustomResourceMemberlistProviderConfig;
2+
use serde::{Deserialize, Serialize};
3+
4+
/// Configuration for the gRPC heap service client.
5+
///
6+
/// This configures how to connect to the heap tender service instances via memberlist.
7+
#[derive(Deserialize, Clone, Serialize, Debug)]
8+
pub struct GrpcHeapServiceConfig {
9+
/// Whether the heap service client is enabled. Defaults to false.
10+
#[serde(default = "GrpcHeapServiceConfig::default_enabled")]
11+
pub enabled: bool,
12+
/// Connection timeout in milliseconds. Defaults to 5000ms.
13+
#[serde(default = "GrpcHeapServiceConfig::default_connect_timeout_ms")]
14+
pub connect_timeout_ms: u64,
15+
/// Request timeout in milliseconds. Defaults to 5000ms.
16+
#[serde(default = "GrpcHeapServiceConfig::default_request_timeout_ms")]
17+
pub request_timeout_ms: u64,
18+
/// Maximum message size for encoding. Defaults to 32MB.
19+
#[serde(default = "GrpcHeapServiceConfig::default_max_encoding_message_size")]
20+
pub max_encoding_message_size: usize,
21+
/// Maximum message size for decoding. Defaults to 32MB.
22+
#[serde(default = "GrpcHeapServiceConfig::default_max_decoding_message_size")]
23+
pub max_decoding_message_size: usize,
24+
/// Memberlist provider configuration. Defaults to rust-log-service-memberlist (colocated).
25+
#[serde(default = "GrpcHeapServiceConfig::default_memberlist_provider")]
26+
pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
27+
/// Assignment policy. Must match log service (RendezvousHashing + Murmur3) for data locality.
28+
#[serde(default = "GrpcHeapServiceConfig::default_assignment")]
29+
pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig,
30+
/// Port the heap service listens on. Defaults to 50052.
31+
#[serde(default = "GrpcHeapServiceConfig::default_port")]
32+
pub port: u16,
33+
}
34+
35+
impl GrpcHeapServiceConfig {
36+
fn default_enabled() -> bool {
37+
false
38+
}
39+
40+
fn default_connect_timeout_ms() -> u64 {
41+
5000
42+
}
43+
44+
fn default_request_timeout_ms() -> u64 {
45+
5000
46+
}
47+
48+
fn default_max_encoding_message_size() -> usize {
49+
32_000_000
50+
}
51+
52+
fn default_max_decoding_message_size() -> usize {
53+
32_000_000
54+
}
55+
56+
fn default_memberlist_provider() -> chroma_memberlist::config::MemberlistProviderConfig {
57+
chroma_memberlist::config::MemberlistProviderConfig::CustomResource(
58+
CustomResourceMemberlistProviderConfig {
59+
kube_namespace: "chroma".to_string(),
60+
memberlist_name: "rust-log-service-memberlist".to_string(), // Colocated with log service
61+
queue_size: 100,
62+
},
63+
)
64+
}
65+
66+
fn default_assignment() -> chroma_config::assignment::config::AssignmentPolicyConfig {
67+
// IMPORTANT: Must match log service assignment policy (RendezvousHashing + Murmur3)
68+
// since heap and log services are colocated. This ensures that a collection's
69+
// heap operations go to the same node as its log operations.
70+
chroma_config::assignment::config::AssignmentPolicyConfig::RendezvousHashing(
71+
chroma_config::assignment::config::RendezvousHashingAssignmentPolicyConfig {
72+
hasher: chroma_config::assignment::config::HasherType::Murmur3,
73+
},
74+
)
75+
}
76+
77+
fn default_port() -> u16 {
78+
50052
79+
}
80+
}
81+
82+
impl Default for GrpcHeapServiceConfig {
83+
fn default() -> Self {
84+
GrpcHeapServiceConfig {
85+
enabled: GrpcHeapServiceConfig::default_enabled(),
86+
connect_timeout_ms: GrpcHeapServiceConfig::default_connect_timeout_ms(),
87+
request_timeout_ms: GrpcHeapServiceConfig::default_request_timeout_ms(),
88+
max_encoding_message_size: GrpcHeapServiceConfig::default_max_encoding_message_size(),
89+
max_decoding_message_size: GrpcHeapServiceConfig::default_max_decoding_message_size(),
90+
memberlist_provider: GrpcHeapServiceConfig::default_memberlist_provider(),
91+
assignment: GrpcHeapServiceConfig::default_assignment(),
92+
port: GrpcHeapServiceConfig::default_port(),
93+
}
94+
}
95+
}
96+
97+
/// Configuration for heap service client.
98+
#[derive(Deserialize, Clone, Serialize, Debug)]
99+
pub enum HeapServiceConfig {
100+
/// gRPC-based heap service configuration.
101+
#[serde(alias = "grpc")]
102+
Grpc(GrpcHeapServiceConfig),
103+
}
104+
105+
impl Default for HeapServiceConfig {
106+
fn default() -> Self {
107+
HeapServiceConfig::Grpc(GrpcHeapServiceConfig::default())
108+
}
109+
}

0 commit comments

Comments
 (0)