Skip to content

Commit 62d07a3

Browse files
committed
[ENH]: Add heaptender client to compaction and make finishtask use this
1 parent c9cd620 commit 62d07a3

File tree

16 files changed

+565
-23
lines changed

16 files changed

+565
-23
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Tiltfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ k8s_resource(
237237
k8s_resource('postgres', resource_deps=['k8s_setup'], labels=["infrastructure"], port_forwards='5432:5432')
238238
# Jobs are suffixed with the image tag to ensure they are unique. In this context, the image tag is defined in k8s/distributed-chroma/values.yaml.
239239
k8s_resource('sysdb-migration-latest', resource_deps=['postgres'], labels=["infrastructure"])
240-
k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051'], resource_deps=['minio-deployment'])
240+
k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051', '50052:50052'], resource_deps=['minio-deployment'])
241241
k8s_resource('sysdb', resource_deps=['sysdb-migration-latest'], labels=["chroma"], port_forwards='50051:50051')
242242
k8s_resource('rust-frontend-service', resource_deps=['sysdb', 'rust-log-service'], labels=["chroma"], port_forwards='8000:8000')
243243
k8s_resource('query-service', resource_deps=['sysdb'], labels=["chroma"], port_forwards='50053:50051')

rust/frontend/src/executor/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ fn default_max_query_service_response_size_bytes() -> usize {
1414
1024 * 1024 * 32
1515
}
1616

17+
fn default_query_service_port() -> u16 {
18+
50051
19+
}
20+
1721
/// Configuration for the distributed executor.
1822
/// # Fields
1923
/// - `connections_per_node` - The number of connections to maintain per node
@@ -22,6 +26,7 @@ fn default_max_query_service_response_size_bytes() -> usize {
2226
/// - `request_timeout_ms` - The timeout for the request
2327
/// - `assignment` - The assignment policy to use for routing requests
2428
/// - `memberlist_provider` - The memberlist provider to use for getting the list of nodes
29+
/// - `port` - The port the query service listens on. Defaults to 50051.
2530
#[derive(Deserialize, Clone, Serialize, Debug)]
2631
pub struct DistributedExecutorConfig {
2732
pub connections_per_node: usize,
@@ -36,6 +41,8 @@ pub struct DistributedExecutorConfig {
3641
pub max_query_service_response_size_bytes: usize,
3742
#[serde(default = "ClientSelectionConfig::default")]
3843
pub client_selection_config: ClientSelectionConfig,
44+
#[serde(default = "default_query_service_port")]
45+
pub port: u16,
3946
}
4047

4148
#[derive(Deserialize, Clone, Serialize, Debug)]

rust/frontend/src/executor/distributed.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
7474
config.connections_per_node,
7575
config.connect_timeout_ms,
7676
config.request_timeout_ms,
77+
config.port,
7778
ClientOptions::new(Some(config.max_query_service_response_size_bytes)),
7879
);
7980
let client_manager_handle = system.start_component(client_manager);

rust/log/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ pub struct GrpcLogConfig {
1515
pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
1616
#[serde(default = "GrpcLogConfig::default_assignment")]
1717
pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig,
18+
#[serde(default = "GrpcLogConfig::default_port")]
19+
pub port: u16,
1820
}
1921

2022
impl GrpcLogConfig {
@@ -51,6 +53,10 @@ impl GrpcLogConfig {
5153
},
5254
)
5355
}
56+
57+
fn default_port() -> u16 {
58+
50051
59+
}
5460
}
5561

5662
impl Default for GrpcLogConfig {
@@ -62,6 +68,7 @@ impl Default for GrpcLogConfig {
6268
max_decoding_message_size: GrpcLogConfig::default_max_decoding_message_size(),
6369
memberlist_provider: GrpcLogConfig::default_memberlist_provider(),
6470
assignment: GrpcLogConfig::default_assignment(),
71+
port: GrpcLogConfig::default_port(),
6572
}
6673
}
6774
}

rust/log/src/grpc_log.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ impl Configurable<(GrpcLogConfig, System)> for GrpcLog {
263263
1,
264264
my_config.connect_timeout_ms,
265265
my_config.request_timeout_ms,
266+
my_config.port,
266267
ClientOptions::new(Some(my_config.max_decoding_message_size)),
267268
);
268269
let client_manager_handle = system.start_component(client_manager);
@@ -640,13 +641,14 @@ impl GrpcLog {
640641
ordinal: u64,
641642
) -> Result<(), GarbageCollectError> {
642643
// NOTE(rescrv): Use a raw LogServiceClient so we can open by stateful set ordinal.
644+
let port = self.config.port;
643645
let endpoint_res = match Endpoint::from_shared(format!(
644-
"grpc://rust-log-service-{ordinal}.rust-log-service:50051"
646+
"grpc://rust-log-service-{ordinal}.rust-log-service:{port}"
645647
)) {
646648
Ok(endpoint) => endpoint,
647649
Err(e) => {
648650
return Err(GarbageCollectError::Resolution(format!(
649-
"could not connect to rust-log-service-{ordinal}:50051: {}",
651+
"could not connect to rust-log-service-{ordinal}:{port}: {}",
650652
e
651653
)));
652654
}
@@ -656,7 +658,7 @@ impl GrpcLog {
656658
.timeout(Duration::from_millis(self.config.request_timeout_ms));
657659
let channel = endpoint_res.connect().await.map_err(|err| {
658660
GarbageCollectError::Resolution(format!(
659-
"could not connect to rust-log-service-{ordinal}:50051: {}",
661+
"could not connect to rust-log-service-{ordinal}:{port}: {}",
660662
err
661663
))
662664
})?;

rust/memberlist/src/client_manager.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use chroma_error::ChromaError;
77
use chroma_system::{Component, ComponentContext, Handler};
88
use chroma_tracing::GrpcClientTraceService;
99
use chroma_types::chroma_proto::{
10-
log_service_client::LogServiceClient, query_executor_client::QueryExecutorClient,
10+
heap_tender_service_client::HeapTenderServiceClient, log_service_client::LogServiceClient,
11+
query_executor_client::QueryExecutorClient,
1112
};
1213
use parking_lot::RwLock;
1314
use std::{
@@ -171,6 +172,7 @@ pub struct ClientManager<T> {
171172
connections_per_node: usize,
172173
connect_timeout_ms: u64,
173174
request_timeout_ms: u64,
175+
port: u16,
174176
old_memberlist: Memberlist,
175177
options: ClientOptions,
176178
}
@@ -184,6 +186,7 @@ where
184186
connections_per_node: usize,
185187
connect_timeout_ms: u64,
186188
request_timeout_ms: u64,
189+
port: u16,
187190
options: ClientOptions,
188191
) -> Self {
189192
Self {
@@ -192,6 +195,7 @@ where
192195
connections_per_node,
193196
connect_timeout_ms,
194197
request_timeout_ms,
198+
port,
195199
old_memberlist: Memberlist::new(),
196200
options,
197201
}
@@ -233,8 +237,7 @@ where
233237
}
234238

235239
async fn add_ip_for_node(&mut self, ip: String, node: &str) {
236-
// TODO: Configure the port
237-
let ip_with_port = format!("http://{}:{}", ip, 50051);
240+
let ip_with_port = format!("http://{}:{}", ip, self.port);
238241
let endpoint = match Endpoint::from_shared(ip_with_port) {
239242
Ok(endpoint) => endpoint
240243
.connect_timeout(std::time::Duration::from_millis(self.connect_timeout_ms))
@@ -417,6 +420,17 @@ impl ClientFactory
417420
}
418421
}
419422

423+
impl ClientFactory
424+
for HeapTenderServiceClient<chroma_tracing::GrpcClientTraceService<tonic::transport::Channel>>
425+
{
426+
fn new_from_channel(channel: GrpcClientTraceService<Channel>) -> Self {
427+
HeapTenderServiceClient::new(channel)
428+
}
429+
fn max_decoding_message_size(self, max_size: usize) -> Self {
430+
self.max_decoding_message_size(max_size)
431+
}
432+
}
433+
420434
#[cfg(test)]
421435
mod test {
422436
use super::super::memberlist_provider::Member;
@@ -436,6 +450,7 @@ mod test {
436450
1,
437451
1000,
438452
1000,
453+
50051,
439454
ClientOptions::default(),
440455
);
441456
(client_manager, client_assigner)

rust/s3heap-service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ futures = { workspace = true }
1111
prost-types = { workspace = true }
1212
serde = { workspace = true }
1313
serde_json = { workspace = true }
14+
thiserror = { workspace = true }
1415
tokio = { workspace = true }
1516
tonic = { workspace = true }
1617
tonic-health = { workspace = true }
@@ -19,8 +20,10 @@ tracing = { workspace = true }
1920

2021
chroma-config = { workspace = true }
2122
chroma-error = { workspace = true }
23+
chroma-memberlist = { workspace = true }
2224
chroma-storage = { workspace = true }
2325
chroma-sysdb = { workspace = true }
26+
chroma-system = { workspace = true }
2427
chroma-tracing = { workspace = true, features = ["grpc"] }
2528
chroma-types = { workspace = true }
2629
s3heap = { workspace = true }
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use chroma_memberlist::config::CustomResourceMemberlistProviderConfig;
2+
use serde::{Deserialize, Serialize};
3+
4+
/// Configuration for the gRPC heap service client.
5+
///
6+
/// This configures how to connect to the heap tender service instances via memberlist.
7+
#[derive(Deserialize, Clone, Serialize, Debug)]
8+
pub struct GrpcHeapServiceConfig {
9+
/// Whether the heap service client is enabled. Defaults to false.
10+
#[serde(default = "GrpcHeapServiceConfig::default_enabled")]
11+
pub enabled: bool,
12+
/// Connection timeout in milliseconds. Defaults to 5000ms.
13+
#[serde(default = "GrpcHeapServiceConfig::default_connect_timeout_ms")]
14+
pub connect_timeout_ms: u64,
15+
/// Request timeout in milliseconds. Defaults to 5000ms.
16+
#[serde(default = "GrpcHeapServiceConfig::default_request_timeout_ms")]
17+
pub request_timeout_ms: u64,
18+
/// Maximum message size for encoding. Defaults to 32MB.
19+
#[serde(default = "GrpcHeapServiceConfig::default_max_encoding_message_size")]
20+
pub max_encoding_message_size: usize,
21+
/// Maximum message size for decoding. Defaults to 32MB.
22+
#[serde(default = "GrpcHeapServiceConfig::default_max_decoding_message_size")]
23+
pub max_decoding_message_size: usize,
24+
/// Memberlist provider configuration. Defaults to rust-log-service-memberlist (colocated).
25+
#[serde(default = "GrpcHeapServiceConfig::default_memberlist_provider")]
26+
pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
27+
/// Assignment policy. Must match log service (RendezvousHashing + Murmur3) for data locality.
28+
#[serde(default = "GrpcHeapServiceConfig::default_assignment")]
29+
pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig,
30+
/// Port the heap service listens on. Defaults to 50052.
31+
#[serde(default = "GrpcHeapServiceConfig::default_port")]
32+
pub port: u16,
33+
}
34+
35+
impl GrpcHeapServiceConfig {
36+
fn default_enabled() -> bool {
37+
false
38+
}
39+
40+
fn default_connect_timeout_ms() -> u64 {
41+
5000
42+
}
43+
44+
fn default_request_timeout_ms() -> u64 {
45+
5000
46+
}
47+
48+
fn default_max_encoding_message_size() -> usize {
49+
32_000_000
50+
}
51+
52+
fn default_max_decoding_message_size() -> usize {
53+
32_000_000
54+
}
55+
56+
fn default_memberlist_provider() -> chroma_memberlist::config::MemberlistProviderConfig {
57+
chroma_memberlist::config::MemberlistProviderConfig::CustomResource(
58+
CustomResourceMemberlistProviderConfig {
59+
kube_namespace: "chroma".to_string(),
60+
memberlist_name: "rust-log-service-memberlist".to_string(), // Colocated with log service
61+
queue_size: 100,
62+
},
63+
)
64+
}
65+
66+
fn default_assignment() -> chroma_config::assignment::config::AssignmentPolicyConfig {
67+
// IMPORTANT: Must match log service assignment policy (RendezvousHashing + Murmur3)
68+
// since heap and log services are colocated. This ensures that a collection's
69+
// heap operations go to the same node as its log operations.
70+
chroma_config::assignment::config::AssignmentPolicyConfig::RendezvousHashing(
71+
chroma_config::assignment::config::RendezvousHashingAssignmentPolicyConfig {
72+
hasher: chroma_config::assignment::config::HasherType::Murmur3,
73+
},
74+
)
75+
}
76+
77+
fn default_port() -> u16 {
78+
50052
79+
}
80+
}
81+
82+
impl Default for GrpcHeapServiceConfig {
83+
fn default() -> Self {
84+
GrpcHeapServiceConfig {
85+
enabled: GrpcHeapServiceConfig::default_enabled(),
86+
connect_timeout_ms: GrpcHeapServiceConfig::default_connect_timeout_ms(),
87+
request_timeout_ms: GrpcHeapServiceConfig::default_request_timeout_ms(),
88+
max_encoding_message_size: GrpcHeapServiceConfig::default_max_encoding_message_size(),
89+
max_decoding_message_size: GrpcHeapServiceConfig::default_max_decoding_message_size(),
90+
memberlist_provider: GrpcHeapServiceConfig::default_memberlist_provider(),
91+
assignment: GrpcHeapServiceConfig::default_assignment(),
92+
port: GrpcHeapServiceConfig::default_port(),
93+
}
94+
}
95+
}
96+
97+
/// Configuration for heap service client.
98+
#[derive(Deserialize, Clone, Serialize, Debug)]
99+
pub enum HeapServiceConfig {
100+
/// gRPC-based heap service configuration.
101+
#[serde(alias = "grpc")]
102+
Grpc(GrpcHeapServiceConfig),
103+
}
104+
105+
impl Default for HeapServiceConfig {
106+
fn default() -> Self {
107+
HeapServiceConfig::Grpc(GrpcHeapServiceConfig::default())
108+
}
109+
}

0 commit comments

Comments
 (0)