diff --git a/Cargo.lock b/Cargo.lock index af21fcfbd0e..862b247b0ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7135,8 +7135,10 @@ dependencies = [ "async-trait", "chroma-config", "chroma-error", + "chroma-memberlist", "chroma-storage", "chroma-sysdb", + "chroma-system", "chroma-tracing", "chroma-types", "chrono", @@ -7146,6 +7148,7 @@ dependencies = [ "s3heap", "serde", "serde_json", + "thiserror 1.0.69", "tokio", "tonic", "tonic-health", diff --git a/Tiltfile b/Tiltfile index 6035efa2044..3cadeed16f8 100644 --- a/Tiltfile +++ b/Tiltfile @@ -237,7 +237,7 @@ k8s_resource( k8s_resource('postgres', resource_deps=['k8s_setup'], labels=["infrastructure"], port_forwards='5432:5432') # Jobs are suffixed with the image tag to ensure they are unique. In this context, the image tag is defined in k8s/distributed-chroma/values.yaml. k8s_resource('sysdb-migration-latest', resource_deps=['postgres'], labels=["infrastructure"]) -k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051'], resource_deps=['minio-deployment']) +k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051', '50052:50052'], resource_deps=['minio-deployment']) k8s_resource('sysdb', resource_deps=['sysdb-migration-latest'], labels=["chroma"], port_forwards='50051:50051') k8s_resource('rust-frontend-service', resource_deps=['sysdb', 'rust-log-service'], labels=["chroma"], port_forwards='8000:8000') k8s_resource('query-service', resource_deps=['sysdb'], labels=["chroma"], port_forwards='50053:50051') diff --git a/rust/frontend/src/executor/config.rs b/rust/frontend/src/executor/config.rs index 9193527f9eb..52d2cb26ba7 100644 --- a/rust/frontend/src/executor/config.rs +++ b/rust/frontend/src/executor/config.rs @@ -14,6 +14,10 @@ fn default_max_query_service_response_size_bytes() -> usize { 1024 * 1024 * 32 } +fn default_query_service_port() -> u16 { + 50051 +} + /// Configuration for the distributed executor. /// # Fields /// - `connections_per_node` - The number of connections to maintain per node @@ -22,6 +26,7 @@ fn default_max_query_service_response_size_bytes() -> usize { /// - `request_timeout_ms` - The timeout for the request /// - `assignment` - The assignment policy to use for routing requests /// - `memberlist_provider` - The memberlist provider to use for getting the list of nodes +/// - `port` - The port the query service listens on. Defaults to 50051. #[derive(Deserialize, Clone, Serialize, Debug)] pub struct DistributedExecutorConfig { pub connections_per_node: usize, @@ -36,6 +41,8 @@ pub struct DistributedExecutorConfig { pub max_query_service_response_size_bytes: usize, #[serde(default = "ClientSelectionConfig::default")] pub client_selection_config: ClientSelectionConfig, + #[serde(default = "default_query_service_port")] + pub port: u16, } #[derive(Deserialize, Clone, Serialize, Debug)] diff --git a/rust/frontend/src/executor/distributed.rs b/rust/frontend/src/executor/distributed.rs index b7786577b24..d5cf5c68d3f 100644 --- a/rust/frontend/src/executor/distributed.rs +++ b/rust/frontend/src/executor/distributed.rs @@ -74,6 +74,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx config.connections_per_node, config.connect_timeout_ms, config.request_timeout_ms, + config.port, ClientOptions::new(Some(config.max_query_service_response_size_bytes)), ); let client_manager_handle = system.start_component(client_manager); diff --git a/rust/log/src/config.rs b/rust/log/src/config.rs index 38f3e408751..76ae797f7a8 100644 --- a/rust/log/src/config.rs +++ b/rust/log/src/config.rs @@ -15,6 +15,8 @@ pub struct GrpcLogConfig { pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig, #[serde(default = "GrpcLogConfig::default_assignment")] pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig, + #[serde(default = "GrpcLogConfig::default_port")] + pub port: u16, } impl GrpcLogConfig { @@ -51,6 +53,10 @@ impl GrpcLogConfig { }, ) } + + fn default_port() -> u16 { + 50051 + } } impl Default for GrpcLogConfig { @@ -62,6 +68,7 @@ impl Default for GrpcLogConfig { max_decoding_message_size: GrpcLogConfig::default_max_decoding_message_size(), memberlist_provider: GrpcLogConfig::default_memberlist_provider(), assignment: GrpcLogConfig::default_assignment(), + port: GrpcLogConfig::default_port(), } } } diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index 10251faf618..9213238cb55 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -263,6 +263,7 @@ impl Configurable<(GrpcLogConfig, System)> for GrpcLog { 1, my_config.connect_timeout_ms, my_config.request_timeout_ms, + my_config.port, ClientOptions::new(Some(my_config.max_decoding_message_size)), ); let client_manager_handle = system.start_component(client_manager); @@ -640,13 +641,14 @@ impl GrpcLog { ordinal: u64, ) -> Result<(), GarbageCollectError> { // NOTE(rescrv): Use a raw LogServiceClient so we can open by stateful set ordinal. + let port = self.config.port; let endpoint_res = match Endpoint::from_shared(format!( - "grpc://rust-log-service-{ordinal}.rust-log-service:50051" + "grpc://rust-log-service-{ordinal}.rust-log-service:{port}" )) { Ok(endpoint) => endpoint, Err(e) => { return Err(GarbageCollectError::Resolution(format!( - "could not connect to rust-log-service-{ordinal}:50051: {}", + "could not connect to rust-log-service-{ordinal}:{port}: {}", e ))); } @@ -656,7 +658,7 @@ impl GrpcLog { .timeout(Duration::from_millis(self.config.request_timeout_ms)); let channel = endpoint_res.connect().await.map_err(|err| { GarbageCollectError::Resolution(format!( - "could not connect to rust-log-service-{ordinal}:50051: {}", + "could not connect to rust-log-service-{ordinal}:{port}: {}", err )) })?; diff --git a/rust/memberlist/src/client_manager.rs b/rust/memberlist/src/client_manager.rs index 7e65a07859b..62b2465fe48 100644 --- a/rust/memberlist/src/client_manager.rs +++ b/rust/memberlist/src/client_manager.rs @@ -7,7 +7,8 @@ use chroma_error::ChromaError; use chroma_system::{Component, ComponentContext, Handler}; use chroma_tracing::GrpcClientTraceService; use chroma_types::chroma_proto::{ - log_service_client::LogServiceClient, query_executor_client::QueryExecutorClient, + heap_tender_service_client::HeapTenderServiceClient, log_service_client::LogServiceClient, + query_executor_client::QueryExecutorClient, }; use parking_lot::RwLock; use std::{ @@ -171,6 +172,7 @@ pub struct ClientManager { connections_per_node: usize, connect_timeout_ms: u64, request_timeout_ms: u64, + port: u16, old_memberlist: Memberlist, options: ClientOptions, } @@ -184,6 +186,7 @@ where connections_per_node: usize, connect_timeout_ms: u64, request_timeout_ms: u64, + port: u16, options: ClientOptions, ) -> Self { Self { @@ -192,6 +195,7 @@ where connections_per_node, connect_timeout_ms, request_timeout_ms, + port, old_memberlist: Memberlist::new(), options, } @@ -233,8 +237,7 @@ where } async fn add_ip_for_node(&mut self, ip: String, node: &str) { - // TODO: Configure the port - let ip_with_port = format!("http://{}:{}", ip, 50051); + let ip_with_port = format!("http://{}:{}", ip, self.port); let endpoint = match Endpoint::from_shared(ip_with_port) { Ok(endpoint) => endpoint .connect_timeout(std::time::Duration::from_millis(self.connect_timeout_ms)) @@ -417,6 +420,17 @@ impl ClientFactory } } +impl ClientFactory + for HeapTenderServiceClient> +{ + fn new_from_channel(channel: GrpcClientTraceService) -> Self { + HeapTenderServiceClient::new(channel) + } + fn max_decoding_message_size(self, max_size: usize) -> Self { + self.max_decoding_message_size(max_size) + } +} + #[cfg(test)] mod test { use super::super::memberlist_provider::Member; @@ -436,6 +450,7 @@ mod test { 1, 1000, 1000, + 50051, ClientOptions::default(), ); (client_manager, client_assigner) diff --git a/rust/s3heap-service/Cargo.toml b/rust/s3heap-service/Cargo.toml index 9f1992fb738..5455231fca5 100644 --- a/rust/s3heap-service/Cargo.toml +++ b/rust/s3heap-service/Cargo.toml @@ -11,6 +11,7 @@ futures = { workspace = true } prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } tonic-health = { workspace = true } @@ -19,8 +20,10 @@ tracing = { workspace = true } chroma-config = { workspace = true } chroma-error = { workspace = true } +chroma-memberlist = { workspace = true } chroma-storage = { workspace = true } chroma-sysdb = { workspace = true } +chroma-system = { workspace = true } chroma-tracing = { workspace = true, features = ["grpc"] } chroma-types = { workspace = true } s3heap = { workspace = true } diff --git a/rust/s3heap-service/src/client/config.rs b/rust/s3heap-service/src/client/config.rs new file mode 100644 index 00000000000..35fd08ba4bf --- /dev/null +++ b/rust/s3heap-service/src/client/config.rs @@ -0,0 +1,109 @@ +use chroma_memberlist::config::CustomResourceMemberlistProviderConfig; +use serde::{Deserialize, Serialize}; + +/// Configuration for the gRPC heap service client. +/// +/// This configures how to connect to the heap tender service instances via memberlist. +#[derive(Deserialize, Clone, Serialize, Debug)] +pub struct GrpcHeapServiceConfig { + /// Whether the heap service client is enabled. Defaults to false. + #[serde(default = "GrpcHeapServiceConfig::default_enabled")] + pub enabled: bool, + /// Connection timeout in milliseconds. Defaults to 5000ms. + #[serde(default = "GrpcHeapServiceConfig::default_connect_timeout_ms")] + pub connect_timeout_ms: u64, + /// Request timeout in milliseconds. Defaults to 5000ms. + #[serde(default = "GrpcHeapServiceConfig::default_request_timeout_ms")] + pub request_timeout_ms: u64, + /// Maximum message size for encoding. Defaults to 32MB. + #[serde(default = "GrpcHeapServiceConfig::default_max_encoding_message_size")] + pub max_encoding_message_size: usize, + /// Maximum message size for decoding. Defaults to 32MB. + #[serde(default = "GrpcHeapServiceConfig::default_max_decoding_message_size")] + pub max_decoding_message_size: usize, + /// Memberlist provider configuration. Defaults to rust-log-service-memberlist (colocated). + #[serde(default = "GrpcHeapServiceConfig::default_memberlist_provider")] + pub memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig, + /// Assignment policy. Must match log service (RendezvousHashing + Murmur3) for data locality. + #[serde(default = "GrpcHeapServiceConfig::default_assignment")] + pub assignment: chroma_config::assignment::config::AssignmentPolicyConfig, + /// Port the heap service listens on. Defaults to 50052. + #[serde(default = "GrpcHeapServiceConfig::default_port")] + pub port: u16, +} + +impl GrpcHeapServiceConfig { + fn default_enabled() -> bool { + false + } + + fn default_connect_timeout_ms() -> u64 { + 5000 + } + + fn default_request_timeout_ms() -> u64 { + 5000 + } + + fn default_max_encoding_message_size() -> usize { + 32_000_000 + } + + fn default_max_decoding_message_size() -> usize { + 32_000_000 + } + + fn default_memberlist_provider() -> chroma_memberlist::config::MemberlistProviderConfig { + chroma_memberlist::config::MemberlistProviderConfig::CustomResource( + CustomResourceMemberlistProviderConfig { + kube_namespace: "chroma".to_string(), + memberlist_name: "rust-log-service-memberlist".to_string(), // Colocated with log service + queue_size: 100, + }, + ) + } + + fn default_assignment() -> chroma_config::assignment::config::AssignmentPolicyConfig { + // IMPORTANT: Must match log service assignment policy (RendezvousHashing + Murmur3) + // since heap and log services are colocated. This ensures that a collection's + // heap operations go to the same node as its log operations. + chroma_config::assignment::config::AssignmentPolicyConfig::RendezvousHashing( + chroma_config::assignment::config::RendezvousHashingAssignmentPolicyConfig { + hasher: chroma_config::assignment::config::HasherType::Murmur3, + }, + ) + } + + fn default_port() -> u16 { + 50052 + } +} + +impl Default for GrpcHeapServiceConfig { + fn default() -> Self { + GrpcHeapServiceConfig { + enabled: GrpcHeapServiceConfig::default_enabled(), + connect_timeout_ms: GrpcHeapServiceConfig::default_connect_timeout_ms(), + request_timeout_ms: GrpcHeapServiceConfig::default_request_timeout_ms(), + max_encoding_message_size: GrpcHeapServiceConfig::default_max_encoding_message_size(), + max_decoding_message_size: GrpcHeapServiceConfig::default_max_decoding_message_size(), + memberlist_provider: GrpcHeapServiceConfig::default_memberlist_provider(), + assignment: GrpcHeapServiceConfig::default_assignment(), + port: GrpcHeapServiceConfig::default_port(), + } + } +} + +/// Configuration for heap service client. +#[derive(Deserialize, Clone, Serialize, Debug)] +pub enum HeapServiceConfig { + /// gRPC-based heap service configuration. + #[serde(alias = "grpc")] + Grpc(GrpcHeapServiceConfig), +} + +impl Default for HeapServiceConfig { + fn default() -> Self { + HeapServiceConfig::Grpc(GrpcHeapServiceConfig::default()) + } +} diff --git a/rust/s3heap-service/src/client/grpc.rs b/rust/s3heap-service/src/client/grpc.rs new file mode 100644 index 00000000000..f49b60ee658 --- /dev/null +++ b/rust/s3heap-service/src/client/grpc.rs @@ -0,0 +1,207 @@ +use std::fmt::Debug; + +use crate::client::config::GrpcHeapServiceConfig; +use async_trait::async_trait; +use chroma_config::assignment::assignment_policy::AssignmentPolicy; +use chroma_config::registry::Registry; +use chroma_config::Configurable; +use chroma_error::{ChromaError, ErrorCodes}; +use chroma_memberlist::client_manager::{ + ClientAssigner, ClientAssignmentError, ClientManager, ClientOptions, +}; +use chroma_memberlist::config::MemberlistProviderConfig; +use chroma_memberlist::memberlist_provider::{ + CustomResourceMemberlistProvider, MemberlistProvider, +}; +use chroma_system::{ComponentHandle, System}; +use chroma_types::chroma_proto::heap_tender_service_client::HeapTenderServiceClient; +use chroma_types::chroma_proto::{self}; +use thiserror::Error; +use tonic::Request; +use tracing::Instrument; + +//////////////// Errors //////////////// + +/// Errors that can occur when interacting with the heap service. +#[derive(Error, Debug)] +pub enum GrpcHeapServiceError { + /// Failed to establish connection to heap service. + #[error("Failed to connect to heap service")] + FailedToConnect(#[from] tonic::transport::Error), + /// Failed to push schedules to heap. + #[error("Failed to push to heap: {0}")] + FailedToPush(#[from] tonic::Status), + /// Failed to prune completed tasks from heap. + #[error("Failed to prune heap: {0}")] + FailedToPrune(tonic::Status), + /// Failed to get heap summary statistics. + #[error("Failed to get heap summary: {0}")] + FailedToGetSummary(tonic::Status), + /// Error from client assignment (e.g., no nodes available). + #[error(transparent)] + ClientAssignerError(#[from] ClientAssignmentError), +} + +impl ChromaError for GrpcHeapServiceError { + fn code(&self) -> ErrorCodes { + match self { + GrpcHeapServiceError::FailedToConnect(_) => ErrorCodes::Internal, + GrpcHeapServiceError::FailedToPush(err) => err.code().into(), + GrpcHeapServiceError::FailedToPrune(err) => err.code().into(), + GrpcHeapServiceError::FailedToGetSummary(err) => err.code().into(), + GrpcHeapServiceError::ClientAssignerError(e) => e.code(), + } + } +} + +type HeapClient = + HeapTenderServiceClient>; + +/// gRPC client for the heap tender service. +/// +/// This client provides access to the heap tender service which manages scheduled tasks. +/// It uses memberlist-based service discovery to find heap service instances that are +/// colocated with log service pods. +#[derive(Clone)] +pub struct GrpcHeapService { + config: GrpcHeapServiceConfig, + client_assigner: ClientAssigner, + // Component handles stored to prevent orphaning - these keep the components alive + _client_manager_handle: ComponentHandle>, + _memberlist_provider_handle: ComponentHandle, +} + +impl Debug for GrpcHeapService { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GrpcHeapService") + .field("config", &self.config) + .field("client_assigner", &self.client_assigner) + .finish() + } +} + +impl GrpcHeapService { + /// Create a new heap service client. + pub fn new( + config: GrpcHeapServiceConfig, + client_assigner: ClientAssigner, + client_manager_handle: ComponentHandle>, + memberlist_provider_handle: ComponentHandle, + ) -> Self { + Self { + config, + client_assigner, + _client_manager_handle: client_manager_handle, + _memberlist_provider_handle: memberlist_provider_handle, + } + } + + /// Check if the heap service is enabled in configuration + pub fn is_enabled(&self) -> bool { + self.config.enabled + } + + /// Check if the heap service client is ready (has discovered nodes) + pub fn is_ready(&self) -> bool { + !self.client_assigner.is_empty() + } + + fn client_for(&mut self, key: &str) -> Result { + // Replication factor is always 1 for heap service so we grab the first assigned client. + self.client_assigner.clients(key)?.drain(..).next().ok_or( + ClientAssignmentError::NoClientFound( + "Improbable state: no client found for key".to_string(), + ), + ) + } + + /// Push schedules to the heap + #[tracing::instrument(skip(self, schedules))] + pub async fn push( + &mut self, + schedules: Vec, + key: &str, + ) -> Result { + let mut client = self.client_for(key)?; + let request = Request::new(chroma_proto::PushRequest { schedules }); + let response = client + .push(request) + .instrument(tracing::info_span!("heap_service_push")) + .await?; + Ok(response.into_inner()) + } + + /// Prune completed tasks from the heap + #[tracing::instrument(skip(self))] + pub async fn prune( + &mut self, + limits: Option, + key: &str, + ) -> Result { + let mut client = self.client_for(key)?; + let request = Request::new(chroma_proto::PruneRequest { limits }); + let response = client + .prune(request) + .instrument(tracing::info_span!("heap_service_prune")) + .await + .map_err(GrpcHeapServiceError::FailedToPrune)?; + Ok(response.into_inner()) + } + + /// Get summary statistics from the heap + #[tracing::instrument(skip(self))] + pub async fn summary( + &mut self, + key: &str, + ) -> Result { + let mut client = self.client_for(key)?; + let request = Request::new(chroma_proto::HeapSummaryRequest {}); + let response = client + .summary(request) + .instrument(tracing::info_span!("heap_service_summary")) + .await + .map_err(GrpcHeapServiceError::FailedToGetSummary)?; + Ok(response.into_inner()) + } +} + +#[async_trait] +impl Configurable<(GrpcHeapServiceConfig, System)> for GrpcHeapService { + async fn try_from_config( + my_config: &(GrpcHeapServiceConfig, System), + registry: &Registry, + ) -> Result> { + let (my_config, system) = my_config; + let assignment_policy = + Box::::try_from_config(&my_config.assignment, registry).await?; + let client_assigner = ClientAssigner::new(assignment_policy, 1); + let client_manager = ClientManager::new( + client_assigner.clone(), + 1, + my_config.connect_timeout_ms, + my_config.request_timeout_ms, + my_config.port, + ClientOptions::new(Some(my_config.max_decoding_message_size)), + ); + let client_manager_handle = system.start_component(client_manager); + + let mut memberlist_provider = match &my_config.memberlist_provider { + MemberlistProviderConfig::CustomResource(_memberlist_provider_config) => { + CustomResourceMemberlistProvider::try_from_config( + &my_config.memberlist_provider, + registry, + ) + .await? + } + }; + memberlist_provider.subscribe(client_manager_handle.receiver()); + let memberlist_provider_handle = system.start_component(memberlist_provider); + + return Ok(GrpcHeapService::new( + my_config.clone(), + client_assigner, + client_manager_handle, + memberlist_provider_handle, + )); + } +} diff --git a/rust/s3heap-service/src/client/mod.rs b/rust/s3heap-service/src/client/mod.rs new file mode 100644 index 00000000000..1a6690ffd6a --- /dev/null +++ b/rust/s3heap-service/src/client/mod.rs @@ -0,0 +1,13 @@ +//! gRPC client for heap tender service. +//! +//! This module provides a client for interacting with the heap tender service, +//! which is colocated with the log service on the same nodes but listens on +//! a different port (50052 vs 50051). + +/// Configuration types for the heap service client. +pub mod config; +/// gRPC client implementation for the heap service. +pub mod grpc; + +pub use config::{GrpcHeapServiceConfig, HeapServiceConfig}; +pub use grpc::{GrpcHeapService, GrpcHeapServiceError}; diff --git a/rust/s3heap-service/src/lib.rs b/rust/s3heap-service/src/lib.rs index ad4dc437c40..c8b5c5f0c35 100644 --- a/rust/s3heap-service/src/lib.rs +++ b/rust/s3heap-service/src/lib.rs @@ -34,6 +34,9 @@ mod scheduler; pub use scheduler::SysDbScheduler; +/// gRPC client for heap tender service +pub mod client; + //////////////////////////////////////////// conversions /////////////////////////////////////////// /// Error type for conversion failures. diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 0e01cc4a2df..e1db6a4a370 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -37,6 +37,8 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use opentelemetry::trace::TraceContextExt; +use s3heap_service::client::GrpcHeapService; +use s3heap_service::client::HeapServiceConfig; use std::collections::HashSet; use std::fmt::Debug; use std::fmt::Formatter; @@ -82,6 +84,8 @@ pub(crate) struct CompactionManagerContext { log: Log, sysdb: SysDb, #[allow(dead_code)] + heap_service: Option, + #[allow(dead_code)] storage: Storage, blockfile_provider: BlockfileProvider, hnsw_index_provider: HnswIndexProvider, @@ -116,6 +120,8 @@ pub(crate) enum CompactionError { FailedToCompact, #[error("Failed to execute task")] FailedToExecuteTask, + #[error("Heap service is not initialized for task based compaction")] + HeapServiceNotInitialized, } impl ChromaError for CompactionError { @@ -123,6 +129,7 @@ impl ChromaError for CompactionError { match self { CompactionError::FailedToCompact => ErrorCodes::Internal, CompactionError::FailedToExecuteTask => ErrorCodes::Internal, + CompactionError::HeapServiceNotInitialized => ErrorCodes::InvalidArgument, } } } @@ -147,7 +154,8 @@ impl CompactionManager { fetch_log_batch_size: u32, purge_dirty_log_timeout_seconds: u64, repair_log_offsets_timeout_seconds: u64, - ) -> Self { + heap_service: Option, + ) -> Result> { let (compact_awaiter_tx, compact_awaiter_rx) = mpsc::channel::(compaction_manager_queue_size); @@ -158,7 +166,17 @@ impl CompactionManager { let compact_awaiter = tokio::spawn(async { compact_awaiter_loop(compact_awaiter_rx, completion_tx).await; }); - CompactionManager { + + if mode == ExecutionMode::Task { + // Check to see if heap_service is Some + if heap_service.is_none() { + tracing::error!( + "Heap service is required for task based compaction but was not initialized" + ); + return Err(Box::new(CompactionError::HeapServiceNotInitialized)); + } + } + Ok(CompactionManager { mode, scheduler, context: CompactionManagerContext { @@ -178,12 +196,13 @@ impl CompactionManager { fetch_log_batch_size, purge_dirty_log_timeout_seconds, repair_log_offsets_timeout_seconds, + heap_service, }, on_next_memberlist_signal: None, compact_awaiter_channel: compact_awaiter_tx, compact_awaiter_completion_channel: completion_rx, compact_awaiter, - } + }) } #[instrument(name = "CompactionManager::start_compaction_batch", skip(self))] @@ -349,6 +368,12 @@ impl CompactionManager { self.context.dispatcher = Some(dispatcher); } + /// Get a mutable reference to the heap service client if available. + #[allow(dead_code)] + pub(crate) fn heap_service(&mut self) -> Option<&mut s3heap_service::client::GrpcHeapService> { + self.context.heap_service.as_mut() + } + fn process_completions(&mut self) -> Vec { let compact_awaiter_completion_channel = &mut self.compact_awaiter_completion_channel; let mut completed_collections = Vec::new(); @@ -456,6 +481,9 @@ impl CompactionManagerContext { self.max_partition_size, self.log.clone(), self.sysdb.clone(), + self.heap_service.ok_or_else(|| { + Box::new(CompactionError::HeapServiceNotInitialized) as Box + })?, self.blockfile_provider.clone(), self.hnsw_index_provider.clone(), self.spann_provider.clone(), @@ -570,7 +598,32 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager { ) .await?; - Ok(CompactionManager::new( + // Initialize heap service if enabled + let heap_service = match &config.heap_service { + HeapServiceConfig::Grpc(heap_config) if heap_config.enabled => { + match GrpcHeapService::try_from_config( + &(heap_config.clone(), system.clone()), + registry, + ) + .await + { + Ok(service) => { + tracing::info!("Heap service client initialized"); + Some(service) + } + Err(err) => { + tracing::warn!("Failed to initialize heap service: {:?}", err); + None + } + } + } + _ => { + tracing::info!("Heap service is disabled"); + None + } + }; + + CompactionManager::new( ExecutionMode::Compaction, // Default to Compaction mode system.clone(), scheduler, @@ -588,7 +641,8 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager { fetch_log_batch_size, purge_dirty_log_timeout_seconds, repair_log_offsets_timeout_seconds, - )) + heap_service, + ) } } pub(crate) async fn create_taskrunner_manager( @@ -612,6 +666,13 @@ pub(crate) async fn create_taskrunner_manager( let assignment_policy = Box::::try_from_config(assignment_policy_config, registry).await?; + let heap_service_config = config.heap_service.clone(); + let heap_service = match heap_service_config { + HeapServiceConfig::Grpc(grpc_config) => { + GrpcHeapService::try_from_config(&(grpc_config, system.clone()), registry).await? + } + }; + let scheduler = Scheduler::new( ExecutionMode::Task, // Taskrunner mode my_ip, @@ -649,7 +710,7 @@ pub(crate) async fn create_taskrunner_manager( ) .await?; - Ok(CompactionManager::new( + CompactionManager::new( ExecutionMode::Task, // Taskrunner mode system.clone(), scheduler, @@ -667,7 +728,8 @@ pub(crate) async fn create_taskrunner_manager( task_config.fetch_log_batch_size, 0, // purge_dirty_log_timeout_seconds not used for tasks 0, // repair_log_offsets_timeout_seconds not used for tasks - )) + Some(heap_service), + ) } async fn compact_awaiter_loop( @@ -1154,7 +1216,9 @@ mod tests { fetch_log_batch_size, purge_dirty_log_timeout_seconds, repair_log_offsets_timeout_seconds, - ); + None, // heap_service not needed in tests + ) + .expect("Failed to create compaction manager in test"); let dispatcher = Dispatcher::new(DispatcherConfig { num_worker_threads: 10, diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 44a7416c869..e5cd816f750 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -224,6 +224,8 @@ pub struct CompactionServiceConfig { #[serde(default)] pub log: chroma_log::config::LogConfig, #[serde(default)] + pub heap_service: s3heap_service::client::HeapServiceConfig, + #[serde(default)] pub dispatcher: chroma_system::DispatcherConfig, #[serde(default)] pub compactor: crate::compactor::config::CompactorConfig, diff --git a/rust/worker/src/execution/operators/finish_task.rs b/rust/worker/src/execution/operators/finish_task.rs index f07df7b13b1..d4806d98c37 100644 --- a/rust/worker/src/execution/operators/finish_task.rs +++ b/rust/worker/src/execution/operators/finish_task.rs @@ -12,13 +12,27 @@ use thiserror::Error; pub struct FinishTaskOperator { log_client: Log, sysdb: SysDb, + heap_service: s3heap_service::client::GrpcHeapService, } impl FinishTaskOperator { /// Create a new finish task operator. + /// + /// # Parameters + /// * `log_client` - Log client for scouting log records + /// * `sysdb` - SysDB client for task state management + /// * `heap_service` - Heap service client for scheduling next task runs (required) #[allow(dead_code)] - pub fn new(log_client: Log, sysdb: SysDb) -> Box { - Box::new(FinishTaskOperator { log_client, sysdb }) + pub fn new( + log_client: Log, + sysdb: SysDb, + heap_service: s3heap_service::client::GrpcHeapService, + ) -> Box { + Box::new(FinishTaskOperator { + log_client, + sysdb, + heap_service, + }) } } @@ -53,6 +67,8 @@ pub enum FinishTaskError { ScoutLogs(String), #[error("Failed to finish task in SysDB: {0}")] SysDb(#[from] SysDbFinishTaskError), + #[error("Failed to schedule task in heap service: {0}")] + HeapService(#[from] s3heap_service::client::GrpcHeapServiceError), } impl ChromaError for FinishTaskError { @@ -60,6 +76,7 @@ impl ChromaError for FinishTaskError { match self { FinishTaskError::ScoutLogs(_) => ErrorCodes::Internal, FinishTaskError::SysDb(e) => e.code(), + FinishTaskError::HeapService(e) => e.code(), } } } @@ -114,7 +131,30 @@ impl Operator for FinishTaskOperator { "Detected new records written during task execution that exceed threshold" ); - // TODO: Schedule a new task for next nonce by pushing to the heap + // Schedule a new task for next nonce by pushing to the heap + let mut heap_service = self.heap_service.clone(); + let schedule = chroma_types::chroma_proto::Schedule { + triggerable: Some(chroma_types::chroma_proto::Triggerable { + partitioning_uuid: input.updated_task.input_collection_id.to_string(), + scheduling_uuid: input.updated_task.id.0.to_string(), + }), + next_scheduled: Some(prost_types::Timestamp::from(input.updated_task.next_run)), + nonce: input.updated_task.next_nonce.0.to_string(), + }; + + heap_service + .push( + vec![schedule], + &input.updated_task.input_collection_id.to_string(), + ) + .await?; + + tracing::info!( + task_id = %input.updated_task.id.0, + collection_id = %input.updated_task.input_collection_id, + next_nonce = %input.updated_task.next_nonce.0, + "Successfully scheduled next task run in heap" + ); } // Step 2: Update lowest_live_nonce to equal next_nonce @@ -160,6 +200,30 @@ mod tests { ) } + async fn get_test_heap_service() -> s3heap_service::client::GrpcHeapService { + use chroma_system::System; + + let system = System::new(); + let registry = chroma_config::registry::Registry::default(); + let config = s3heap_service::client::GrpcHeapServiceConfig { + enabled: true, + port: 50052, + connect_timeout_ms: 5000, + request_timeout_ms: 5000, + ..Default::default() + }; + + let port = config.port; + s3heap_service::client::GrpcHeapService::try_from_config( + &(config, system), + ®istry, + ) + .await + .unwrap_or_else(|_| { + panic!("Failed to create test heap service client - ensure heap service is running on localhost:{}", port) + }) + } + async fn setup_tenant_and_database( sysdb: &mut SysDb, tenant: &str, @@ -237,7 +301,8 @@ mod tests { assert_ne!(task_advanced.next_nonce, initial_nonce); let input = FinishTaskInput::new(task_advanced.clone()); - let operator = FinishTaskOperator::new(log.clone(), sysdb.clone()); + let heap_service = get_test_heap_service().await; + let operator = FinishTaskOperator::new(log.clone(), sysdb.clone(), heap_service); // Run finish_task - should move lowest_live_nonce up to match next_nonce let result = operator.run(&input).await; @@ -289,7 +354,8 @@ mod tests { assert_ne!(nonce_a, nonce_b); let input = FinishTaskInput::new(task_after_advance.clone()); - let operator = FinishTaskOperator::new(log.clone(), sysdb.clone()); + let heap_service = get_test_heap_service().await; + let operator = FinishTaskOperator::new(log.clone(), sysdb.clone(), heap_service); // Run finish_task let result = operator.run(&input).await; @@ -337,7 +403,8 @@ mod tests { }; let input = FinishTaskInput::new(fake_task.clone()); - let operator = FinishTaskOperator::new(log.clone(), sysdb.clone()); + let heap_service = get_test_heap_service().await; + let operator = FinishTaskOperator::new(log.clone(), sysdb.clone(), heap_service); // Run let result = operator.run(&input).await; diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index d7d93bf2f91..30dcbb88eb7 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -33,6 +33,7 @@ use chroma_types::{ SegmentFlushInfo, SegmentType, SegmentUuid, Task, TaskUuid, }; use opentelemetry::trace::TraceContextExt; +use s3heap_service::client::GrpcHeapService; use thiserror::Error; use tokio::sync::oneshot::{error::RecvError, Sender}; use tracing::Span; @@ -199,6 +200,7 @@ pub struct CompactOrchestrator { // === Task Context (optional) === /// Available if this orchestrator is for a task task_context: Option, + heap_service: Option, } #[derive(Error, Debug)] @@ -371,6 +373,7 @@ impl CompactOrchestrator { metrics: CompactOrchestratorMetrics::default(), schema: None, task_context: None, + heap_service: None, } } @@ -383,6 +386,7 @@ impl CompactOrchestrator { max_partition_size: usize, log: Log, sysdb: SysDb, + heap_service: GrpcHeapService, blockfile_provider: BlockfileProvider, hnsw_provider: HnswIndexProvider, spann_provider: SpannProvider, @@ -410,6 +414,7 @@ impl CompactOrchestrator { task: None, execution_nonce, }); + orchestrator.heap_service = Some(heap_service); orchestrator } @@ -997,9 +1002,20 @@ impl Handler> for CompactOrchest self.output_collection_id = output.output_collection_id.into(); if output.should_skip_execution { + let Some(heap_service) = self.heap_service.clone() else { + self.terminate_with_result( + Err(CompactionError::InvariantViolation( + "Heap service not initialized", + )), + ctx, + ) + .await; + return; + }; + // Proceed to FinishTask let task = wrap( - FinishTaskOperator::new(self.log.clone(), self.sysdb.clone()), + FinishTaskOperator::new(self.log.clone(), self.sysdb.clone(), heap_service), FinishTaskInput::new(output.task), ctx.receiver(), self.context.task_cancellation_token.clone(), @@ -1747,7 +1763,19 @@ impl Handler> for CompactOrchestrator updated_task.id.0 ); - let finish_task_op = FinishTaskOperator::new(self.log.clone(), self.sysdb.clone()); + let Some(heap_service) = self.heap_service.clone() else { + self.terminate_with_result( + Err(CompactionError::InvariantViolation( + "Heap service not initialized", + )), + ctx, + ) + .await; + return; + }; + + let finish_task_op = + FinishTaskOperator::new(self.log.clone(), self.sysdb.clone(), heap_service); let finish_task_input = FinishTaskInput::new(updated_task); let task = wrap( @@ -1795,6 +1823,7 @@ mod tests { PrimitiveOperator, Where, }; use regex::Regex; + use s3heap_service::client::{GrpcHeapService, GrpcHeapServiceConfig}; use crate::{ config::RootConfig, @@ -2057,6 +2086,14 @@ mod tests { .expect("Should connect to grpc sysdb"); let mut sysdb = SysDb::Grpc(grpc_sysdb); + // Connect to Grpc Heap Service (requires Tilt running) + let heap_service = GrpcHeapService::try_from_config( + &(GrpcHeapServiceConfig::default(), system.clone()), + ®istry, + ) + .await + .expect("Should connect to grpc heap service"); + let test_segments = TestDistributedSegment::new().await; let mut in_memory_log = InMemoryLog::new(); @@ -2174,6 +2211,7 @@ mod tests { 50, log.clone(), sysdb.clone(), + heap_service.clone(), test_segments.blockfile_provider.clone(), test_segments.hnsw_provider.clone(), test_segments.spann_provider.clone(), @@ -2284,6 +2322,7 @@ mod tests { 50, log_2.clone(), sysdb.clone(), + heap_service.clone(), test_segments.blockfile_provider.clone(), test_segments.hnsw_provider.clone(), test_segments.spann_provider.clone(),