diff --git a/rust/garbage_collector/src/garbage_collector_component.rs b/rust/garbage_collector/src/garbage_collector_component.rs index da4b2e5e75b..ccf51ce9f14 100644 --- a/rust/garbage_collector/src/garbage_collector_component.rs +++ b/rust/garbage_collector/src/garbage_collector_component.rs @@ -62,8 +62,6 @@ enum GarbageCollectCollectionError { #[error("Uninitialized: missing dispatcher or system")] Uninitialized, #[error("Failed to run garbage collection orchestrator: {0}")] - OrchestratorError(#[from] crate::garbage_collector_orchestrator::GarbageCollectorError), - #[error("Failed to run garbage collection orchestrator: {0}")] OrchestratorV2Error(#[from] crate::garbage_collector_orchestrator_v2::GarbageCollectorError), } @@ -166,75 +164,39 @@ impl GarbageCollector { .as_ref() .ok_or(GarbageCollectCollectionError::Uninitialized)?; - if cleanup_mode.is_v2() { - let enable_log_gc = collection.tenant <= self.config.enable_log_gc_for_tenant_threshold - || self - .config - .enable_log_gc_for_tenant - .contains(&collection.tenant); + let enable_log_gc = collection.tenant <= self.config.enable_log_gc_for_tenant_threshold + || self + .config + .enable_log_gc_for_tenant + .contains(&collection.tenant); - let orchestrator = - crate::garbage_collector_orchestrator_v2::GarbageCollectorOrchestrator::new( - collection.id, - collection.version_file_path, - collection.lineage_file_path, - version_absolute_cutoff_time, - collection_soft_delete_absolute_cutoff_time, - self.sysdb_client.clone(), - dispatcher.clone(), - system.clone(), - self.storage.clone(), - self.logs.clone(), - self.root_manager.clone(), - cleanup_mode, - self.config.min_versions_to_keep, - enable_log_gc, - enable_dangerous_option_to_ignore_min_versions_for_wal3, - ); - - let started_at = SystemTime::now(); - let result = match orchestrator.run(system.clone()).await { - Ok(res) => res, - Err(e) => { - tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e); - return Err(GarbageCollectCollectionError::OrchestratorV2Error(e)); - } - }; - let duration_ms = started_at - .elapsed() - .map(|d| d.as_millis() as u64) - .unwrap_or(0); - self.job_duration_ms_metric.record(duration_ms, &[]); - self.total_files_deleted_metric.add( - result.num_files_deleted as u64, - &[opentelemetry::KeyValue::new( - "cleanup_mode", - format!("{:?}", cleanup_mode), - )], - ); - self.total_versions_deleted_metric.add( - result.num_versions_deleted as u64, - &[opentelemetry::KeyValue::new( - "cleanup_mode", - format!("{:?}", cleanup_mode), - )], + let orchestrator = + crate::garbage_collector_orchestrator_v2::GarbageCollectorOrchestrator::new( + collection.id, + collection.version_file_path, + collection.lineage_file_path, + version_absolute_cutoff_time, + collection_soft_delete_absolute_cutoff_time, + self.sysdb_client.clone(), + dispatcher.clone(), + system.clone(), + self.storage.clone(), + self.logs.clone(), + self.root_manager.clone(), + cleanup_mode, + self.config.min_versions_to_keep, + enable_log_gc, + enable_dangerous_option_to_ignore_min_versions_for_wal3, ); - return Ok(result); - } - - let orchestrator = crate::garbage_collector_orchestrator::GarbageCollectorOrchestrator::new( - collection.id, - collection.version_file_path, - version_absolute_cutoff_time, - self.sysdb_client.clone(), - dispatcher.clone(), - self.storage.clone(), - cleanup_mode, - ); - let started_at = SystemTime::now(); - let result = orchestrator.run(system.clone()).await?; + let result = match orchestrator.run(system.clone()).await { + Ok(res) => res, + Err(e) => { + tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e); + return Err(GarbageCollectCollectionError::OrchestratorV2Error(e)); + } + }; let duration_ms = started_at .elapsed() .map(|d| d.as_millis() as u64) @@ -254,6 +216,7 @@ impl GarbageCollector { format!("{:?}", cleanup_mode), )], ); + Ok(result) } @@ -390,7 +353,6 @@ impl Handler for GarbageCollector { struct GarbageCollectResult { num_completed_jobs: u32, num_failed_jobs: u32, - num_skipped_jobs: u32, num_hard_deleted_databases: u32, } @@ -492,32 +454,23 @@ impl Handler for GarbageCollector { } } - let mut num_skipped_jobs = 0; - let collections_to_gc = collections_to_gc.into_iter().map(|collection| { - let cleanup_mode = if let Some(tenant_mode_overrides) = &self.config.tenant_mode_overrides { - tenant_mode_overrides - .get(&collection.tenant) - .cloned() - .unwrap_or(self.config.default_mode) - } else { - self.config.default_mode - }; - - (cleanup_mode.to_owned(), collection) - }).filter(|(cleanup_mode, collection)| { - if collection.lineage_file_path.is_some() && !cleanup_mode.is_v2() { - tracing::debug!( - "Skipping garbage collection for root of fork tree because GC v1 cannot handle fork trees: {}", - collection.id - ); - num_skipped_jobs += 1; - return false; - } - - true - }) - .take(self.config.max_collections_to_gc as usize) - .collect::>(); + let collections_to_gc = collections_to_gc + .into_iter() + .map(|collection| { + let cleanup_mode = + if let Some(tenant_mode_overrides) = &self.config.tenant_mode_overrides { + tenant_mode_overrides + .get(&collection.tenant) + .cloned() + .unwrap_or(self.config.default_mode) + } else { + self.config.default_mode + }; + + (cleanup_mode.to_owned(), collection) + }) + .take(self.config.max_collections_to_gc as usize) + .collect::>(); tracing::info!( "Filtered to {} collections to garbage collect", @@ -640,7 +593,6 @@ impl Handler for GarbageCollector { return GarbageCollectResult { num_completed_jobs, num_failed_jobs, - num_skipped_jobs, num_hard_deleted_databases: num_hard_deleted_databases as u32, }; } @@ -867,134 +819,6 @@ mod tests { (collection_id, database_name) } - #[tokio::test] - #[traced_test] - async fn test_k8s_integration_ignores_forked_collections() { - let tenant_id = format!("tenant-{}", Uuid::new_v4()); - let tenant_mode_overrides = HashMap::from([(tenant_id.clone(), CleanupMode::Delete)]); - - let config = GarbageCollectorConfig { - service_name: "gc".to_string(), - otel_endpoint: "none".to_string(), - otel_filters: vec![OtelFilter { - crate_name: "garbage_collector".to_string(), - filter_level: OtelFilterLevel::Debug, - }], - version_cutoff_time: Duration::from_secs(1), - collection_soft_delete_grace_period: Duration::from_secs(1), - max_collections_to_gc: 100, - max_collections_to_fetch: None, - gc_interval_mins: 10, - disallow_collections: HashSet::new(), - min_versions_to_keep: 2, - filter_min_versions_if_alive: None, - sysdb_config: GrpcSysDbConfig { - host: "localhost".to_string(), - port: 50051, - connect_timeout_ms: 5000, - request_timeout_ms: 10000, - num_channels: 1, - }, - dispatcher_config: DispatcherConfig::default(), - storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await, - default_mode: CleanupMode::DryRun, - tenant_mode_overrides: Some(tenant_mode_overrides), - assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(), - my_member_id: "test-gc".to_string(), - memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig::default(), - port: 50055, - root_cache_config: Default::default(), - jemalloc_pprof_server_port: None, - log: LogConfig::Grpc(GrpcLogConfig::default()), - enable_log_gc_for_tenant: Vec::new(), - enable_log_gc_for_tenant_threshold: "ffffffff-ffff-ffff-ffff-ffffffffffff".to_string(), - enable_dangerous_option_to_ignore_min_versions_for_wal3: false, - }; - let registry = Registry::new(); - - // Create collection - let mut clients = ChromaGrpcClients::new().await.unwrap(); - - let (collection_id, _) = create_test_collection(tenant_id.clone(), &mut clients).await; - let mut sysdb = SysDb::Grpc( - GrpcSysDb::try_from_config(&config.sysdb_config, ®istry) - .await - .unwrap(), - ); - let collections = sysdb - .get_collections(GetCollectionsOptions { - collection_id: Some(collection_id), - ..Default::default() - }) - .await - .unwrap(); - let collection = collections.first().unwrap(); - // Fork collection - sysdb - .fork_collection( - collection_id, - collection.log_position as u64, - collection.log_position as u64, - CollectionUuid::new(), - "test-fork".to_string(), - ) - .await - .unwrap(); - - // Wait 1 second for cutoff time - tokio::time::sleep(Duration::from_secs(1)).await; - - // Run garbage collection - let system = System::new(); - let mut garbage_collector_component = - GarbageCollector::try_from_config(&(config.clone(), system.clone()), ®istry) - .await - .unwrap(); - - let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, ®istry) - .await - .unwrap(); - - let dispatcher_handle = system.start_component(dispatcher); - - garbage_collector_component.set_dispatcher(dispatcher_handle); - garbage_collector_component.set_system(system.clone()); - let mut garbage_collector_handle = system.start_component(garbage_collector_component); - - garbage_collector_handle - .send( - vec![Member { - member_id: "test-gc".to_string(), - member_ip: "0.0.0.0".to_string(), - member_node_name: "test-gc-node".to_string(), - }], - None, - ) - .await - .unwrap(); - - let result = garbage_collector_handle - .request( - GarbageCollectMessage { - tenant: Some(tenant_id.clone()), - }, - Some(Span::current()), - ) - .await - .unwrap(); - - // Should have skipped - assert_eq!( - result, - GarbageCollectResult { - num_completed_jobs: 0, - num_failed_jobs: 0, - num_skipped_jobs: 1, - num_hard_deleted_databases: 0, - } - ); - } - #[tokio::test] #[traced_test] async fn test_k8s_integration_tenant_mode_override() { @@ -1003,7 +827,7 @@ mod tests { let tenant_id_for_dry_run_mode = format!("tenant-dry-run-mode-{}", Uuid::new_v4()); let mut tenant_mode_overrides = HashMap::new(); - tenant_mode_overrides.insert(tenant_id_for_delete_mode.clone(), CleanupMode::Delete); + tenant_mode_overrides.insert(tenant_id_for_delete_mode.clone(), CleanupMode::DeleteV2); let config = GarbageCollectorConfig { service_name: "gc".to_string(), @@ -1029,7 +853,7 @@ mod tests { }, dispatcher_config: DispatcherConfig::default(), storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await, - default_mode: CleanupMode::DryRun, + default_mode: CleanupMode::DryRunV2, tenant_mode_overrides: Some(tenant_mode_overrides), assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(), my_member_id: "test-gc".to_string(), @@ -1140,10 +964,10 @@ mod tests { .await .unwrap(); - // There should be 3 versions left in delete mode, since the version 1 should have been deleted. + // There should be 2 versions left in delete mode, since the versions 0 and 1 should have been deleted. assert_eq!( delete_mode_versions.versions.len(), - 3, + 2, "Expected 3 versions in delete mode, found {}", delete_mode_versions.versions.len() ); @@ -1341,7 +1165,6 @@ mod tests { GarbageCollectResult { num_completed_jobs: 1, num_failed_jobs: 0, - num_skipped_jobs: 0, num_hard_deleted_databases: 0, // The database should not have been hard deleted yet } ); @@ -1364,7 +1187,6 @@ mod tests { GarbageCollectResult { num_completed_jobs: 1, num_failed_jobs: 0, - num_skipped_jobs: 0, num_hard_deleted_databases: 1, // The database should have been hard deleted } ); diff --git a/rust/garbage_collector/src/garbage_collector_orchestrator.rs b/rust/garbage_collector/src/garbage_collector_orchestrator.rs deleted file mode 100644 index 9c16e67c312..00000000000 --- a/rust/garbage_collector/src/garbage_collector_orchestrator.rs +++ /dev/null @@ -1,1245 +0,0 @@ -//! Garbage Collection Pipeline -//! -//! The garbage collection process follows these stages: -//! -//! 1. Fetch Version File (FetchVersionFileOperator) -//! - Retrieves the collection version file from storage -//! - Input: Version file path -//! - Output: Version file content -//! -//! 2. Compute Versions to Delete (ComputeVersionsToDeleteOperator) -//! - Identifies versions older than cutoff time while preserving minimum required versions -//! - Input: Version file, cutoff time, minimum versions to keep -//! - Output: List of versions to delete -//! -//! 3. Mark Versions at SysDB (MarkVersionsAtSysDbOperator) -//! - Marks identified versions for deletion in the system database -//! - Input: Version file, versions to delete, epoch ID -//! - Output: Marked versions confirmation -//! -//! 4. Fetch Sparse Index Files (FetchSparseIndexFilesOperator) -//! - Retrieves sparse index files for versions marked for deletion -//! - Input: Version file, versions to delete -//! - Output: Map of version IDs to file contents -//! -//! 5. Compute Unused Files (ComputeUnusedBetweenVersionsOperator) -//! - Analyzes sparse index files to identify S3 files no longer referenced -//! - Input: Version file, version contents -//! - Output: Set of unused S3 file paths -//! -//! 6. Delete Unused Files (DeleteUnusedFilesOperator) -//! - Deletes unused S3 files -//! - Input: Set of unused S3 file paths -//! - Output: Deletion confirmation -//! -//! 7. Delete Versions (DeleteVersionsAtSysDbOperator) -//! - Permanently deletes marked versions from the system database -//! - Input: Version file, versions to delete, unused S3 files -//! - Output: Deletion confirmation - -use crate::types::{CleanupMode, GarbageCollectorResponse}; -use async_trait::async_trait; -use chroma_error::{ChromaError, ErrorCodes}; -use chroma_storage::Storage; -use chroma_sysdb::SysDb; -use chroma_system::{ - wrap, ChannelError, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, - OrchestratorContext, PanicError, TaskError, TaskMessage, TaskResult, -}; -use chroma_types::chroma_proto::CollectionVersionFile; -use chroma_types::CollectionUuid; -use chrono::{DateTime, Utc}; -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; -use thiserror::Error; -use tokio::sync::oneshot::{error::RecvError, Sender}; -use tracing::Span; - -use crate::operators::compute_unused_files::{ - ComputeUnusedFilesError, ComputeUnusedFilesInput, ComputeUnusedFilesOperator, - ComputeUnusedFilesOutput, -}; -use crate::operators::compute_versions_to_delete::{ - ComputeVersionsToDeleteError, ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOperator, - ComputeVersionsToDeleteOutput, -}; -use crate::operators::delete_unused_files::{ - DeleteUnusedFilesError, DeleteUnusedFilesInput, DeleteUnusedFilesOperator, - DeleteUnusedFilesOutput, -}; -use crate::operators::delete_versions_at_sysdb::{ - DeleteVersionsAtSysDbError, DeleteVersionsAtSysDbInput, DeleteVersionsAtSysDbOperator, - DeleteVersionsAtSysDbOutput, -}; -use crate::operators::fetch_version_file::{ - FetchVersionFileError, FetchVersionFileInput, FetchVersionFileOperator, FetchVersionFileOutput, -}; -use crate::operators::mark_versions_at_sysdb::{ - MarkVersionsAtSysDbError, MarkVersionsAtSysDbInput, MarkVersionsAtSysDbOperator, - MarkVersionsAtSysDbOutput, -}; - -pub struct GarbageCollectorOrchestrator { - collection_id: CollectionUuid, - version_file_path: String, - absolute_cutoff_time: DateTime, - sysdb_client: SysDb, - context: OrchestratorContext, - storage: Storage, - result_channel: Option>>, - pending_version_file: Option>, - pending_versions_to_delete: Option, - pending_epoch_id: Option, - num_versions_deleted: u32, - deletion_list: Vec, - cleanup_mode: CleanupMode, -} - -impl Debug for GarbageCollectorOrchestrator { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("GarbageCollector").finish() - } -} - -#[allow(clippy::too_many_arguments)] -impl GarbageCollectorOrchestrator { - pub fn new( - collection_id: CollectionUuid, - version_file_path: String, - absolute_cutoff_time: DateTime, - sysdb_client: SysDb, - dispatcher: ComponentHandle, - storage: Storage, - cleanup_mode: CleanupMode, - ) -> Self { - Self { - collection_id, - version_file_path, - absolute_cutoff_time, - sysdb_client, - context: OrchestratorContext::new(dispatcher), - storage, - cleanup_mode, - result_channel: None, - pending_version_file: None, - pending_versions_to_delete: None, - pending_epoch_id: None, - num_versions_deleted: 0, - deletion_list: Vec::new(), - } - } -} - -#[derive(Error, Debug)] -pub enum GarbageCollectorError { - #[error("FetchVersionFile error: {0}")] - FetchVersionFile(#[from] FetchVersionFileError), - #[error("Panic during compaction: {0}")] - Panic(#[from] PanicError), - #[error("Error sending message through channel: {0}")] - Channel(#[from] ChannelError), - #[error("Error receiving final result: {0}")] - Result(#[from] RecvError), - #[error("{0}")] - Generic(#[from] Box), - #[error("ComputeVersionsToDelete error: {0}")] - ComputeVersionsToDelete(#[from] ComputeVersionsToDeleteError), - #[error("MarkVersionsAtSysDb error: {0}")] - MarkVersionsAtSysDb(#[from] MarkVersionsAtSysDbError), - #[error("ComputeUnusedFiles error: {0}")] - ComputeUnusedFiles(#[from] ComputeUnusedFilesError), - #[error("DeleteVersionsAtSysDb error: {0}")] - DeleteVersionsAtSysDb(#[from] DeleteVersionsAtSysDbError), - #[error("The task was aborted because resources were exhausted")] - Aborted, - #[error("DeleteUnusedFiles error: {0}")] - DeleteUnusedFiles(#[from] DeleteUnusedFilesError), -} - -impl ChromaError for GarbageCollectorError { - fn code(&self) -> ErrorCodes { - ErrorCodes::Internal - } -} - -impl From> for GarbageCollectorError -where - E: Into, -{ - fn from(value: TaskError) -> Self { - match value { - TaskError::Panic(e) => GarbageCollectorError::Panic(e), - TaskError::TaskFailed(e) => e.into(), - TaskError::Aborted => GarbageCollectorError::Aborted, - } - } -} - -#[async_trait] -impl Orchestrator for GarbageCollectorOrchestrator { - type Output = GarbageCollectorResponse; - type Error = GarbageCollectorError; - - fn dispatcher(&self) -> ComponentHandle { - self.context.dispatcher.clone() - } - - fn context(&self) -> &OrchestratorContext { - &self.context - } - - async fn initial_tasks( - &mut self, - ctx: &ComponentContext, - ) -> Vec<(TaskMessage, Option)> { - tracing::info!( - path = %self.version_file_path, - "Creating initial fetch version file task" - ); - - vec![( - wrap( - Box::new(FetchVersionFileOperator {}), - FetchVersionFileInput::new(self.version_file_path.clone(), self.storage.clone()), - ctx.receiver(), - self.context.task_cancellation_token.clone(), - ), - Some(Span::current()), - )] - } - - fn set_result_channel( - &mut self, - sender: Sender>, - ) { - self.result_channel = Some(sender); - } - - fn take_result_channel( - &mut self, - ) -> Option>> { - self.result_channel.take() - } -} - -#[async_trait] -impl Handler> - for GarbageCollectorOrchestrator -{ - type Result = (); - - async fn handle( - &mut self, - message: TaskResult, - ctx: &ComponentContext, - ) { - tracing::info!("Processing FetchVersionFile result"); - - // Stage 1: Process fetched version file and initiate version computation - let output = match self.ok_or_terminate(message.into_inner(), ctx).await { - Some(output) => output, - None => { - tracing::error!("Failed to get version file output"); - return; - } - }; - let version_file = output.file; - - tracing::info!("Creating compute versions task"); - let compute_task = wrap( - Box::new(ComputeVersionsToDeleteOperator {}), - ComputeVersionsToDeleteInput { - version_file, - cutoff_time: self.absolute_cutoff_time, - min_versions_to_keep: 2, - }, - ctx.receiver(), - self.context.task_cancellation_token.clone(), - ); - - tracing::info!("Sending compute versions task to dispatcher"); - if let Err(e) = self - .dispatcher() - .send(compute_task, Some(Span::current())) - .await - { - tracing::error!(error = ?e, "Failed to send compute task to dispatcher"); - self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx) - .await; - return; - } - tracing::info!("Successfully sent compute versions task"); - } -} - -#[async_trait] -impl Handler> - for GarbageCollectorOrchestrator -{ - type Result = (); - - async fn handle( - &mut self, - message: TaskResult, - ctx: &ComponentContext, - ) { - // Stage 2: Process computed versions and initiate marking in SysDB - let output = match self.ok_or_terminate(message.into_inner(), ctx).await { - Some(output) => output, - None => return, - }; - - // If no versions to delete, terminate early with success - if output.versions_to_delete.versions.is_empty() { - tracing::info!("No versions to delete, terminating garbage collection early"); - let response = GarbageCollectorResponse { - collection_id: self.collection_id, - num_versions_deleted: 0, - num_files_deleted: 0, - ..Default::default() - }; - tracing::info!(?response, "Garbage collection completed early"); - self.terminate_with_result(Ok(response), ctx).await; - // Signal the dispatcher to shut down - return; - } - - self.num_versions_deleted = output.versions_to_delete.versions.len() as u32; - self.pending_versions_to_delete = Some(output.versions_to_delete.clone()); - self.pending_version_file = Some(output.version_file.clone()); - - let mark_task = wrap( - Box::new(MarkVersionsAtSysDbOperator {}), - MarkVersionsAtSysDbInput { - version_file: output.version_file, - versions_to_delete: output.versions_to_delete, - sysdb_client: self.sysdb_client.clone(), - epoch_id: 0, - oldest_version_to_keep: output.oldest_version_to_keep, - }, - ctx.receiver(), - self.context.task_cancellation_token.clone(), - ); - - if let Err(e) = self - .dispatcher() - .send(mark_task, Some(Span::current())) - .await - { - self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx) - .await; - // Signal the dispatcher to shut down - return; - } - } -} - -#[async_trait] -impl Handler> - for GarbageCollectorOrchestrator -{ - type Result = (); - - async fn handle( - &mut self, - message: TaskResult, - ctx: &ComponentContext, - ) { - // Stage 3: After marking versions, compute unused files - let output = match self.ok_or_terminate(message.into_inner(), ctx).await { - Some(output) => output, - None => return, - }; - - let compute_task = wrap( - Box::new(ComputeUnusedFilesOperator::new( - self.collection_id.to_string(), - self.storage.clone(), - 2, // min_versions_to_keep - )), - ComputeUnusedFilesInput { - version_file: output.version_file, - versions_to_delete: output.versions_to_delete, - oldest_version_to_keep: output.oldest_version_to_keep, - }, - ctx.receiver(), - self.context.task_cancellation_token.clone(), - ); - - if let Err(e) = self - .dispatcher() - .send(compute_task, Some(Span::current())) - .await - { - self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx) - .await; - return; - } - } -} - -#[async_trait] -impl Handler> - for GarbageCollectorOrchestrator -{ - type Result = (); - - async fn handle( - &mut self, - message: TaskResult, - ctx: &ComponentContext, - ) { - // Stage 4: After identifying unused files, delete them - let output = match self.ok_or_terminate(message.into_inner(), ctx).await { - Some(output) => output, - None => return, - }; - - let delete_task = wrap( - Box::new(DeleteUnusedFilesOperator::new( - self.storage.clone(), - self.cleanup_mode, - self.collection_id.to_string(), - )), - DeleteUnusedFilesInput { - unused_s3_files: output.unused_block_ids.into_iter().collect(), - hnsw_prefixes_for_deletion: output.unused_hnsw_prefixes, - }, - ctx.receiver(), - self.context.task_cancellation_token.clone(), - ); - - if let Err(e) = self - .dispatcher() - .send(delete_task, Some(Span::current())) - .await - { - self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx) - .await; - return; - } - - // Store state needed for final deletion - self.pending_epoch_id = Some(0); // TODO: Get this from somewhere - } -} - -#[async_trait] -impl Handler> - for GarbageCollectorOrchestrator -{ - type Result = (); - - async fn handle( - &mut self, - message: TaskResult, - ctx: &ComponentContext, - ) { - // Stage 6: After deleting unused files, delete the versions - let output = match self.ok_or_terminate(message.into_inner(), ctx).await { - Some(output) => output, - None => return, - }; - - if self.cleanup_mode == CleanupMode::DryRun { - tracing::info!("Dry run mode, skipping actual deletion"); - let response = GarbageCollectorResponse { - collection_id: self.collection_id, - num_versions_deleted: 0, - num_files_deleted: 0, - ..Default::default() - }; - self.terminate_with_result(Ok(response), ctx).await; - return; - } - - // Get stored state - let version_file = self - .pending_version_file - .take() - .expect("Version file should be set"); - let versions_to_delete = self - .pending_versions_to_delete - .take() - .expect("Versions to delete should be set"); - let epoch_id = self - .pending_epoch_id - .take() - .expect("Epoch ID should be set"); - - let delete_versions_task = wrap( - Box::new(DeleteVersionsAtSysDbOperator { - storage: self.storage.clone(), - }), - DeleteVersionsAtSysDbInput { - version_file, - epoch_id, - sysdb_client: self.sysdb_client.clone(), - versions_to_delete, - }, - ctx.receiver(), - self.context.task_cancellation_token.clone(), - ); - - // Update the deletion list so that GarbageCollectorOrchestrator can use it in the final stage. - self.deletion_list = output.deleted_files.clone().into_iter().collect(); - - if let Err(e) = self - .dispatcher() - .send(delete_versions_task, Some(Span::current())) - .await - { - self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx) - .await; - return; - } - } -} - -#[async_trait] -impl Handler> - for GarbageCollectorOrchestrator -{ - type Result = (); - - async fn handle( - &mut self, - message: TaskResult, - ctx: &ComponentContext, - ) { - // Stage 6: Final stage - versions deleted, complete the garbage collection process - let _output = match self.ok_or_terminate(message.into_inner(), ctx).await { - Some(output) => output, - None => return, - }; - - #[expect(deprecated)] - let response = GarbageCollectorResponse { - collection_id: self.collection_id, - num_versions_deleted: self.num_versions_deleted, - num_files_deleted: self.deletion_list.len() as u32, - deletion_list: self.deletion_list.clone(), - }; - - self.terminate_with_result(Ok(response), ctx).await; - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::helper::ChromaGrpcClients; - use chroma_config::registry::Registry; - use chroma_config::Configurable; - use chroma_storage::s3_config_for_localhost_with_bucket_name; - use chroma_storage::GetOptions; - use chroma_sysdb::{GrpcSysDbConfig, SysDbConfig}; - use chroma_system::System; - use std::str::FromStr; - use std::time::{Duration, SystemTime}; - use tracing_test::traced_test; - use uuid::Uuid; - - #[allow(dead_code)] - async fn wait_for_new_version( - clients: &mut ChromaGrpcClients, - collection_id: String, - tenant_id: String, - current_version_count: usize, - max_attempts: usize, - ) -> Result<(), Box> { - for attempt in 1..=max_attempts { - tracing::info!( - attempt, - max_attempts, - collection_id, - "Waiting for new version to be created..." - ); - - tokio::time::sleep(Duration::from_secs(2)).await; - - let versions = clients - .list_collection_versions( - collection_id.clone(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await?; - - if versions.versions.len() > current_version_count { - tracing::info!( - previous_count = current_version_count, - new_count = versions.versions.len(), - "New version detected" - ); - return Ok(()); - } - } - - Err("Timeout waiting for new version to be created".into()) - } - - const TEST_COLLECTIONS_SIZE: usize = 33; - - async fn validate_test_collection( - clients: &mut ChromaGrpcClients, - collection_id: CollectionUuid, - ) { - let results = clients - .get_records(collection_id.to_string(), None, true, false, false) - .await - .unwrap(); - - // Verify all IDs are still present - for i in 0..TEST_COLLECTIONS_SIZE { - let expected_id = format!("id{}", i); - assert!( - results.ids.contains(&expected_id), - "Expected to find {}", - expected_id - ); - } - - // Verify embeddings are unchanged - if let Some(returned_embeddings) = results.embeddings { - assert_eq!( - returned_embeddings.len(), - TEST_COLLECTIONS_SIZE, - "Expected {} embeddings", - TEST_COLLECTIONS_SIZE - ); - - // Compare with expected embeddings - for (i, embedding) in returned_embeddings.iter().enumerate() { - let mut expected_embedding = vec![0.0; 3]; - expected_embedding[i % 3] = 1.0; - assert_eq!( - embedding, &expected_embedding, - "Expected embedding for ID {} to be {:?}", - i, expected_embedding - ); - } - } else { - panic!("Expected embeddings in results"); - } - } - - async fn create_test_collection( - clients: &mut ChromaGrpcClients, - enable_spann: bool, - ) -> (CollectionUuid, String, Uuid, Uuid) { - // Create unique identifiers for tenant and database - let test_uuid = uuid::Uuid::new_v4(); - let tenant_id = format!("test_tenant_{}", test_uuid); - let database_name = format!("test_db_{}", test_uuid); - let collection_name = format!("test_collection_{}", test_uuid); - - tracing::info!( - tenant_id = %tenant_id, - database = %database_name, - collection = %collection_name, - "Starting test with resources" - ); - - let collection_id = clients - .create_database_and_collection( - &tenant_id, - &database_name, - &collection_name, - enable_spann, - ) - .await - .unwrap(); - - tracing::info!(collection_id = %collection_id, "Created collection"); - - let mut embeddings = vec![]; - let mut ids = vec![]; - - for i in 0..TEST_COLLECTIONS_SIZE { - let mut embedding = vec![0.0; 3]; - embedding[i % 3] = 1.0; - embeddings.push(embedding); - ids.push(format!("id{}", i)); - } - - // Get initial version count - let initial_versions = clients - .list_collection_versions( - collection_id.clone(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await - .unwrap(); - let initial_version_count = initial_versions.versions.len(); - - tracing::info!( - initial_count = initial_version_count, - "Initial version count" - ); - - // Add first batch of 11 records - tracing::info!("Adding first batch of embeddings"); - clients - .add_embeddings( - &collection_id, - embeddings[..11].to_vec(), - ids[..11].to_vec(), - ) - .await - .unwrap(); - - // Wait for new version after first batch - wait_for_new_version( - clients, - collection_id.clone(), - tenant_id.clone(), - initial_version_count, - 10, - ) - .await - .unwrap(); - - // Add second batch of 11 records - tracing::info!("Adding second batch of embeddings"); - clients - .add_embeddings( - &collection_id, - embeddings[11..22].to_vec(), - ids[11..22].to_vec(), - ) - .await - .unwrap(); - // Wait for new version after first batch - wait_for_new_version( - clients, - collection_id.clone(), - tenant_id.clone(), - initial_version_count + 1, - 10, - ) - .await - .unwrap(); - - // After adding second batch and waiting for version, add a third batch - tracing::info!("Adding third batch of embeddings (modified records)"); - clients - .add_embeddings( - &collection_id, - embeddings[22..].to_vec(), - ids[22..].to_vec(), - ) - .await - .unwrap(); - - wait_for_new_version( - clients, - collection_id.clone(), - tenant_id.clone(), - initial_version_count + 2, - 10, - ) - .await - .unwrap(); - - let collection_id = CollectionUuid::from_str(&collection_id).unwrap(); - - validate_test_collection(clients, collection_id).await; - - let collection_with_segments = clients - .get_collection_with_segments(collection_id.to_string()) - .await - .unwrap(); - let db_id = collection_with_segments - .collection - .expect("Expected collection to be found") - .database_id - .expect("Expected database ID to be present"); - let db_id = Uuid::from_str(&db_id).expect("Failed to parse database ID"); - - let mut segment_id_str = String::from(""); - for segment in collection_with_segments.segments { - if segment.r#type == "urn:chroma:segment/vector/spann" - || segment.r#type == "urn:chroma:segment/vector/hnsw-distributed" - { - segment_id_str = segment.id.clone(); - } - } - let segment_id = - Uuid::from_str(&segment_id_str).expect("Failed to parse segment ID from collection"); - - (collection_id, tenant_id, db_id, segment_id) - } - - async fn get_hnsw_index_ids(storage: &Storage, s3_path: &str) -> Vec { - storage - .list_prefix(s3_path, GetOptions::default()) - .await - .unwrap() - .into_iter() - .filter(|path| path.contains(&format!("{}/", s3_path))) - .map(|path| Uuid::from_str(path.split("/").nth(9).unwrap()).unwrap()) - .collect::>() // de-dupe - .into_iter() - .collect() - } - - async fn test_k8s_integration_check_end_to_end(use_spann: bool) { - // Create storage config and storage client - let storage_config = s3_config_for_localhost_with_bucket_name("chroma-storage").await; - - let registry = Registry::new(); - let storage = Storage::try_from_config(&storage_config, ®istry) - .await - .unwrap(); - - let mut clients = ChromaGrpcClients::new().await.unwrap(); - let (collection_id, tenant_id, db_id, segment_id) = - create_test_collection(&mut clients, use_spann).await; - - let s3_path = format!( - "tenant/{}/database/{}/collection/{}/segment/{}/hnsw", - tenant_id, db_id, collection_id, segment_id - ); - let hnsw_index_ids_before_gc = get_hnsw_index_ids(&storage, &s3_path).await; - - // Get version count before GC - let versions_before_gc = clients - .list_collection_versions( - collection_id.to_string(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await - .unwrap(); - let unique_versions_before_gc = versions_before_gc - .versions - .iter() - .map(|v| v.version) - .collect::>() - .len(); - assert_eq!( - unique_versions_before_gc, 4, - "Expected 4 unique versions before starting garbage collection" - ); - - // After creating versions and verifying records, start garbage collection: - tracing::info!("Starting garbage collection process"); - - let system = System::new(); - let dispatcher = Dispatcher::new(chroma_system::DispatcherConfig::default()); - let dispatcher_handle = system.start_component(dispatcher); - let sysdb_config = SysDbConfig::Grpc(GrpcSysDbConfig { - host: "localhost".to_string(), - port: 50051, - connect_timeout_ms: 5000, - request_timeout_ms: 10000, - num_channels: 1, - }); - let mut sysdb = SysDb::try_from_config(&sysdb_config, ®istry) - .await - .unwrap(); - - // Get collection info for GC from sysdb - let collections_to_gc = sysdb - .get_collections_to_gc(None, None, None, None) - .await - .unwrap(); - let collection_info = collections_to_gc - .iter() - .find(|c| c.id == collection_id) - .expect("Collection should be available for GC"); - - // Create orchestrator with correct version file path - let orchestrator = GarbageCollectorOrchestrator::new( - collection_id, - collection_info.version_file_path.clone(), - SystemTime::now().into(), // immediately expire versions - sysdb, - dispatcher_handle, - storage.clone(), - CleanupMode::Delete, - ); - - tracing::info!("Running orchestrator"); - let result = orchestrator.run(system).await.unwrap(); - assert_eq!(result.num_versions_deleted, 1); - - // After running GC and waiting for result, verify versions were deleted - let versions_after_gc = clients - .list_collection_versions( - collection_id.to_string(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await - .unwrap(); - - let unique_versions_after_gc = versions_after_gc - .versions - .iter() - .map(|v| v.version) - .collect::>() - .len(); - - tracing::info!( - before = unique_versions_before_gc, - after = unique_versions_after_gc, - "Unique version counts before and after GC" - ); - - assert!( - unique_versions_after_gc >= 2, - "Expected at least 2 unique versions to remain after garbage collection (min_versions_to_keep)" - ); - - // Check HNSW indices - let hnsw_index_ids_after_gc = get_hnsw_index_ids(&storage, &s3_path).await; - tracing::info!( - before = ?hnsw_index_ids_before_gc, - after = ?hnsw_index_ids_after_gc, - "HNSW index IDs before and after GC" - ); - - assert_eq!( - hnsw_index_ids_before_gc.len() - hnsw_index_ids_after_gc.len(), - result.num_versions_deleted as usize, - "Expected {} HNSW indices to be deleted after garbage collection", - result.num_versions_deleted - ); - - tracing::info!("Verifying records are still accessible after GC"); - validate_test_collection(&mut clients, collection_id).await; - } - - #[tokio::test] - #[traced_test] - async fn test_k8s_integration_check_end_to_end_hnsw() { - test_k8s_integration_check_end_to_end(false).await; - } - - #[tokio::test] - #[traced_test] - async fn test_k8s_integration_check_end_to_end_spann() { - test_k8s_integration_check_end_to_end(true).await; - } - - #[tokio::test] - #[traced_test] - async fn test_k8s_integration_soft_delete() { - // Create storage config and storage client - let storage_config = s3_config_for_localhost_with_bucket_name("chroma-storage").await; - - let registry = Registry::new(); - let storage = Storage::try_from_config(&storage_config, ®istry) - .await - .unwrap(); - - let deleted_hnsw_files_before_test: Vec<_> = storage - .list_prefix("gc", GetOptions::default()) - .await - .unwrap() - .into_iter() - .filter(|path| path.contains("gc") && path.contains("header.bin")) - .collect(); - - let mut clients = ChromaGrpcClients::new().await.unwrap(); - let (collection_id, tenant_id, db_id, segment_id) = - create_test_collection(&mut clients, true).await; - - let s3_path = format!( - "tenant/{}/database/{}/collection/{}/segment/{}/hnsw", - tenant_id, db_id, collection_id, segment_id - ); - let hnsw_index_ids_before_gc = get_hnsw_index_ids(&storage, &s3_path).await; - - // Get version count before GC - let versions_before_gc = clients - .list_collection_versions( - collection_id.to_string(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await - .unwrap(); - let unique_versions_before_gc = versions_before_gc - .versions - .iter() - .map(|v| v.version) - .collect::>() - .len(); - assert_eq!( - unique_versions_before_gc, 4, - "Expected 4 unique versions before starting garbage collection" - ); - - // After creating versions and verifying records, start garbage collection: - tracing::info!("Starting garbage collection process"); - - let system = System::new(); - let dispatcher = Dispatcher::new(chroma_system::DispatcherConfig::default()); - let dispatcher_handle = system.start_component(dispatcher); - let sysdb_config = SysDbConfig::Grpc(GrpcSysDbConfig { - host: "localhost".to_string(), - port: 50051, - connect_timeout_ms: 5000, - request_timeout_ms: 10000, - num_channels: 1, - }); - let mut sysdb = SysDb::try_from_config(&sysdb_config, ®istry) - .await - .unwrap(); - - // Get collection info for GC from sysdb - let collections_to_gc = sysdb - .get_collections_to_gc(None, None, None, None) - .await - .unwrap(); - let collection_info = collections_to_gc - .iter() - .find(|c| c.id == collection_id) - .expect("Collection should be available for GC"); - - // Create orchestrator with correct version file path - let orchestrator = GarbageCollectorOrchestrator::new( - collection_id, - collection_info.version_file_path.clone(), - SystemTime::now().into(), // immediately expire versions - sysdb, - dispatcher_handle, - storage.clone(), - CleanupMode::Rename, - ); - - tracing::info!("Running orchestrator"); - let result = orchestrator.run(system).await.unwrap(); - assert_eq!(result.num_versions_deleted, 1); - - // After running GC and waiting for result, verify versions were deleted - let versions_after_gc = clients - .list_collection_versions( - collection_id.to_string(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await - .unwrap(); - - let unique_versions_after_gc = versions_after_gc - .versions - .iter() - .map(|v| v.version) - .collect::>() - .len(); - - tracing::info!( - before = unique_versions_before_gc, - after = unique_versions_after_gc, - "Unique version counts before and after GC" - ); - - assert!( - unique_versions_after_gc >= 2, - "Expected at least 2 unique versions to remain after garbage collection (min_versions_to_keep)" - ); - - // Check HNSW indices - let hnsw_index_ids_after_gc = get_hnsw_index_ids(&storage, &s3_path).await; - tracing::info!( - before = ?hnsw_index_ids_before_gc, - after = ?hnsw_index_ids_after_gc, - "HNSW index IDs before and after GC" - ); - - assert_eq!( - hnsw_index_ids_before_gc.len() - hnsw_index_ids_after_gc.len(), - result.num_versions_deleted as usize, - "Expected {} HNSW indices to be deleted after garbage collection", - result.num_versions_deleted - ); - - tracing::info!("Verifying records are still accessible after GC"); - validate_test_collection(&mut clients, collection_id).await; - - // Verify that "deleted" files are renamed with the "gc" prefix - let deleted_hnsw_files: Vec<_> = storage - .list_prefix("gc", GetOptions::default()) - .await - .unwrap() - .into_iter() - .filter(|path| path.contains("gc") && path.contains("header.bin")) - .collect(); - - tracing::info!( - count = deleted_hnsw_files.len(), - files = ?deleted_hnsw_files, - "Soft-deleted HNSW header files" - ); - - // The number of moved files should match the difference in versions - assert_eq!( - deleted_hnsw_files.len() - deleted_hnsw_files_before_test.len(), - unique_versions_before_gc - unique_versions_after_gc, - "Expected renamed HNSW files to match the number of deleted unique versions" - ); - } - - #[tokio::test] - #[traced_test] - async fn test_k8s_integration_dry_run() { - // Create storage config and storage client - let storage_config = s3_config_for_localhost_with_bucket_name("chroma-storage").await; - - let registry = Registry::new(); - let storage = Storage::try_from_config(&storage_config, ®istry) - .await - .unwrap(); - - let mut clients = ChromaGrpcClients::new().await.unwrap(); - let (collection_id, tenant_id, db_id, segment_id) = - create_test_collection(&mut clients, true).await; - - let s3_path = format!( - "tenant/{}/database/{}/collection/{}/segment/{}/hnsw", - tenant_id, db_id, collection_id, segment_id - ); - let hnsw_index_ids_before_gc = get_hnsw_index_ids(&storage, &s3_path).await; - - // Get version count before GC - let versions_before_gc = clients - .list_collection_versions( - collection_id.to_string(), - tenant_id.clone(), - Some(100), - None, - None, - None, - ) - .await - .unwrap(); - let unique_versions_before_gc = versions_before_gc - .versions - .iter() - .map(|v| v.version) - .collect::>() - .len(); - assert_eq!( - unique_versions_before_gc, 4, - "Expected 4 unique versions before starting garbage collection" - ); - - // After creating versions and verifying records, start garbage collection: - tracing::info!("Starting garbage collection process"); - - let system = System::new(); - let dispatcher = Dispatcher::new(chroma_system::DispatcherConfig::default()); - let dispatcher_handle = system.start_component(dispatcher); - let sysdb_config = SysDbConfig::Grpc(GrpcSysDbConfig { - host: "localhost".to_string(), - port: 50051, - connect_timeout_ms: 5000, - request_timeout_ms: 10000, - num_channels: 1, - }); - let mut sysdb = SysDb::try_from_config(&sysdb_config, ®istry) - .await - .unwrap(); - - // Get collection info for GC from sysdb - let collections_to_gc = sysdb - .get_collections_to_gc(None, None, None, None) - .await - .unwrap(); - let collection_info = collections_to_gc - .iter() - .find(|c| c.id == collection_id) - .expect("Collection should be available for GC"); - - // Create orchestrator with correct version file path - let orchestrator = GarbageCollectorOrchestrator::new( - collection_id, - collection_info.version_file_path.clone(), - SystemTime::now().into(), // immediately expire versions - sysdb, - dispatcher_handle, - storage.clone(), - CleanupMode::DryRun, - ); - - tracing::info!("Running orchestrator"); - let result = orchestrator.run(system).await.unwrap(); - assert_eq!(result.num_versions_deleted, 0); - - // After running GC and waiting for result, verify versions were deleted - let versions_after_gc = clients - .list_collection_versions( - collection_id.to_string(), - tenant_id.clone(), - Some(100), - None, - None, - Some(true), // include versions marked for deletion - ) - .await - .unwrap(); - - // Expect 2 versions to be marked for deletion, but not actually deleted - let num_versions_marked_for_deletion = versions_after_gc - .versions - .iter() - .filter(|v| v.marked_for_deletion) - .count(); - assert_eq!( - num_versions_marked_for_deletion, 1, - "Expected 1 version to be marked for deletion in dry run mode" - ); - - let unique_versions_after_gc = versions_after_gc - .versions - .iter() - .map(|v| v.version) - .collect::>() - .len(); - - assert_eq!( - unique_versions_after_gc, unique_versions_before_gc, - "Expected no versions to be deleted in dry run mode" - ); - - // Check HNSW indices - let hnsw_index_ids_after_gc = get_hnsw_index_ids(&storage, &s3_path).await; - tracing::info!( - before = ?hnsw_index_ids_before_gc, - after = ?hnsw_index_ids_after_gc, - "HNSW index IDs before and after GC" - ); - - assert_eq!( - hnsw_index_ids_before_gc.len(), - hnsw_index_ids_after_gc.len(), - "Expected no HNSW indices to be deleted after garbage collection" - ); - - tracing::info!("Verifying records are still accessible after GC"); - validate_test_collection(&mut clients, collection_id).await; - } -} diff --git a/rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs b/rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs index 6eea893b22b..4a83c4af116 100644 --- a/rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs +++ b/rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs @@ -822,7 +822,7 @@ impl GarbageCollectorOrchestrator { return Ok(()); } - if self.cleanup_mode == CleanupMode::DryRun { + if self.cleanup_mode == CleanupMode::DryRunV2 { tracing::info!("Dry run mode, skipping actual deletion"); let response = GarbageCollectorResponse { num_versions_deleted: 0, @@ -1305,7 +1305,7 @@ mod tests { storage, logs, root_manager, - crate::types::CleanupMode::Delete, + crate::types::CleanupMode::DeleteV2, 1, true, false, diff --git a/rust/garbage_collector/src/helper.rs b/rust/garbage_collector/src/helper.rs index 46aa9da7317..62899fad7bb 100644 --- a/rust/garbage_collector/src/helper.rs +++ b/rust/garbage_collector/src/helper.rs @@ -1,14 +1,11 @@ use chroma_types::chroma_proto::log_service_client::LogServiceClient; -use chroma_types::chroma_proto::query_executor_client::QueryExecutorClient; use chroma_types::chroma_proto::sys_db_client::SysDbClient; use chroma_types::chroma_proto::{ - CreateCollectionRequest, CreateDatabaseRequest, CreateTenantRequest, FilterOperator, - GetCollectionWithSegmentsRequest, GetCollectionWithSegmentsResponse, GetPlan, LimitOperator, + CreateCollectionRequest, CreateDatabaseRequest, CreateTenantRequest, ListCollectionVersionsRequest, ListCollectionVersionsResponse, OperationRecord, - ProjectionOperator, PushLogsRequest, ScanOperator, Segment, SegmentScope, Vector, + PushLogsRequest, Segment, SegmentScope, Vector, }; use chroma_types::InternalCollectionConfiguration; -use std::collections::HashMap; use tonic::transport::Channel; use uuid::Uuid; @@ -16,7 +13,6 @@ use uuid::Uuid; pub struct ChromaGrpcClients { pub sysdb: SysDbClient, pub log_service: LogServiceClient, - pub query_executor: QueryExecutorClient, } impl ChromaGrpcClients { @@ -27,14 +23,10 @@ impl ChromaGrpcClients { let logservice_channel = Channel::from_static("http://localhost:50054") .connect() .await?; - let queryservice_channel = Channel::from_static("http://localhost:50053") - .connect() - .await?; Ok(Self { sysdb: SysDbClient::new(sysdb_channel), log_service: LogServiceClient::new(logservice_channel), - query_executor: QueryExecutorClient::new(queryservice_channel), }) } @@ -163,136 +155,6 @@ impl ChromaGrpcClients { } } - pub async fn get_records( - &mut self, - collection_id: String, - _ids: Option>, - include_embeddings: bool, - include_metadatas: bool, - include_documents: bool, - ) -> Result> { - // First get collection and its segments - let collection_segments = self - .sysdb - .get_collection_with_segments(GetCollectionWithSegmentsRequest { id: collection_id }) - .await? - .into_inner(); - - // Map segments to their scopes - let mut scope_to_segment: HashMap = collection_segments - .segments - .into_iter() - .map(|s| (s.scope, s)) - .collect(); - - // Create the scan operator with collection info and segments - let scan = ScanOperator { - collection: collection_segments.collection, - knn: scope_to_segment.remove(&(SegmentScope::Vector as i32)), - metadata: scope_to_segment.remove(&(SegmentScope::Metadata as i32)), - record: scope_to_segment.remove(&(SegmentScope::Record as i32)), - }; - - // Create the get plan - let get_plan = GetPlan { - scan: Some(scan), - filter: Some(FilterOperator { - ids: None, // ids.map(|ids| UserIds { ids }), - r#where: None, - where_document: None, - }), - limit: Some(LimitOperator { - offset: 0, - limit: None, - }), - projection: Some(ProjectionOperator { - document: false, // include_documents, - embedding: true, // include_embeddings, - metadata: false, // include_metadatas, - }), - }; - - // Execute the get query - let response = self.query_executor.get(get_plan).await?; - let response_inner = response.into_inner(); - - // Convert the response into a GetResult struct - let mut result = GetResult { - ids: Vec::new(), - embeddings: if include_embeddings { - Some(Vec::new()) - } else { - None - }, - metadatas: if include_metadatas { - Some(Vec::new()) - } else { - None - }, - documents: if include_documents { - Some(Vec::new()) - } else { - None - }, - }; - - // Process each record - for record in response_inner.records { - result.ids.push(record.id); - - if include_embeddings { - if let Some(embedding) = record.embedding { - // Convert bytes back to f32 vector - let mut float_vec = Vec::new(); - for chunk in embedding.vector.chunks(4) { - if chunk.len() == 4 { - float_vec - .push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); - } - } - result.embeddings.as_mut().unwrap().push(float_vec); - } - } - - if include_metadatas { - if let Some(ref metadata) = record.metadata { - let mut metadata_map = std::collections::HashMap::new(); - for (key, value) in &metadata.metadata { - // Convert UpdateMetadataValue to String based on its value variant - let string_value = match &value.value { - Some(chroma_types::chroma_proto::update_metadata_value::Value::StringValue(s)) => s.clone(), - Some(chroma_types::chroma_proto::update_metadata_value::Value::IntValue(i)) => i.to_string(), - Some(chroma_types::chroma_proto::update_metadata_value::Value::FloatValue(f)) => f.to_string(), - Some(chroma_types::chroma_proto::update_metadata_value::Value::BoolValue(b)) => b.to_string(), - Some(chroma_types::chroma_proto::update_metadata_value::Value::SparseVectorValue(_)) => unimplemented!("Sparse vector is not supported"), - None => String::new(), - }; - metadata_map.insert(key.clone(), string_value); - } - result.metadatas.as_mut().unwrap().push(metadata_map); - } - } - - if include_documents { - if let Some(ref metadata) = record.metadata { - if let Some(doc_value) = metadata.metadata.get("chroma:document") { - // Convert document UpdateMetadataValue to String - if let Some( - chroma_types::chroma_proto::update_metadata_value::Value::StringValue( - doc_str, - ), - ) = &doc_value.value - { - result.documents.as_mut().unwrap().push(doc_str.clone()); - } - } - } - } - } - - Ok(result) - } - #[allow(dead_code)] pub fn sysdb_client(&mut self) -> &mut SysDbClient { &mut self.sysdb @@ -319,23 +181,4 @@ impl ChromaGrpcClients { let response = self.sysdb.list_collection_versions(request).await?; Ok(response.into_inner()) } - - pub async fn get_collection_with_segments( - &mut self, - collection_id: String, - ) -> Result> { - let request = GetCollectionWithSegmentsRequest { id: collection_id }; - - let response = self.sysdb.get_collection_with_segments(request).await?; - Ok(response.into_inner()) - } -} - -// Add this struct to hold the get results -#[derive(Debug)] -pub struct GetResult { - pub ids: Vec, - pub embeddings: Option>>, - pub metadatas: Option>>, - pub documents: Option>, } diff --git a/rust/garbage_collector/src/lib.rs b/rust/garbage_collector/src/lib.rs index 4da8e105246..1ff7919564c 100644 --- a/rust/garbage_collector/src/lib.rs +++ b/rust/garbage_collector/src/lib.rs @@ -24,7 +24,6 @@ use uuid::Uuid; pub mod config; mod construct_version_graph_orchestrator; mod garbage_collector_component; -pub mod garbage_collector_orchestrator; pub mod garbage_collector_orchestrator_v2; mod log_only_orchestrator; diff --git a/rust/garbage_collector/src/operators/delete_unused_files.rs b/rust/garbage_collector/src/operators/delete_unused_files.rs index e87367e1f9d..a378a5b3bae 100644 --- a/rust/garbage_collector/src/operators/delete_unused_files.rs +++ b/rust/garbage_collector/src/operators/delete_unused_files.rs @@ -94,7 +94,7 @@ impl Operator for DeleteUnusedF // It's possible that the file was already renamed/deleted in the last run that // did not finish successfully (i.e. crashed before committing the work to SysDb). match self.cleanup_mode { - CleanupMode::DryRun | CleanupMode::DryRunV2 => {} + CleanupMode::DryRunV2 => {} CleanupMode::Rename => { // Soft delete - rename the file if !all_files.is_empty() { @@ -121,7 +121,7 @@ impl Operator for DeleteUnusedF } } } - CleanupMode::Delete | CleanupMode::DeleteV2 => { + CleanupMode::DeleteV2 => { // Hard delete - remove the file if !all_files.is_empty() { // The S3 DeleteObjects API allows up to 1000 objects per request @@ -197,7 +197,7 @@ mod tests { let operator = DeleteUnusedFilesOperator::new( storage.clone(), - CleanupMode::DryRun, + CleanupMode::DryRunV2, "test_tenant".to_string(), ); let input = DeleteUnusedFilesInput { @@ -268,7 +268,7 @@ mod tests { let operator = DeleteUnusedFilesOperator::new( storage.clone(), - CleanupMode::Delete, + CleanupMode::DeleteV2, "test_tenant".to_string(), ); let input = DeleteUnusedFilesInput { @@ -301,7 +301,7 @@ mod tests { // Test Delete mode - should succeed but record the error in deletion list let delete_operator = DeleteUnusedFilesOperator::new( storage.clone(), - CleanupMode::Delete, + CleanupMode::DeleteV2, "test_tenant".to_string(), ); let result = delete_operator @@ -327,8 +327,11 @@ mod tests { assert!(result.is_ok()); // Test DryRun mode with nonexistent files (should succeed) - let list_operator = - DeleteUnusedFilesOperator::new(storage, CleanupMode::DryRun, "test_tenant".to_string()); + let list_operator = DeleteUnusedFilesOperator::new( + storage, + CleanupMode::DryRunV2, + "test_tenant".to_string(), + ); let result = list_operator .run(&DeleteUnusedFilesInput { unused_s3_files: unused_files, diff --git a/rust/garbage_collector/src/operators/delete_unused_logs.rs b/rust/garbage_collector/src/operators/delete_unused_logs.rs index fe6ef4e66a8..53e4cce6c85 100644 --- a/rust/garbage_collector/src/operators/delete_unused_logs.rs +++ b/rust/garbage_collector/src/operators/delete_unused_logs.rs @@ -129,7 +129,7 @@ impl Operator for DeleteUnusedLog return Err(DeleteUnusedLogsError::Gc(err)); }; match self.mode { - CleanupMode::Delete | CleanupMode::DeleteV2 => { + CleanupMode::DeleteV2 => { if let Err(err) = writer.garbage_collect_phase3_delete_garbage(&GarbageCollectionOptions::default()).await { tracing::error!("Unable to garbage collect log for collection [{collection_id}]: {err}"); return Err(DeleteUnusedLogsError::Wal3{ collection_id, err}); @@ -149,7 +149,7 @@ impl Operator for DeleteUnusedLog ); } match self.mode { - CleanupMode::Delete | CleanupMode::DeleteV2 => { + CleanupMode::DeleteV2 => { if !input.collections_to_destroy.is_empty() { let mut log_destroy_futures = Vec::with_capacity(input.collections_to_destroy.len()); diff --git a/rust/garbage_collector/src/types.rs b/rust/garbage_collector/src/types.rs index 2b63d3afdbd..7cd722f77eb 100644 --- a/rust/garbage_collector/src/types.rs +++ b/rust/garbage_collector/src/types.rs @@ -8,23 +8,15 @@ pub(crate) const RENAMED_FILE_PREFIX: &str = "gc/renamed/"; #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize, Default)] #[serde(rename_all = "lowercase")] pub enum CleanupMode { + /// Move files to a deletion directory instead of removing them + Rename, // todo: remove:? /// Only list files that would be affected without making changes #[default] - DryRun, - /// Move files to a deletion directory instead of removing them - Rename, - /// Permanently delete files - Delete, DryRunV2, + /// Permanently delete files DeleteV2, } -impl CleanupMode { - pub fn is_v2(&self) -> bool { - matches!(self, CleanupMode::DryRunV2 | CleanupMode::DeleteV2) - } -} - #[derive(Debug, Clone, Copy)] pub enum VersionStatus { #[allow(dead_code)] diff --git a/rust/garbage_collector/tests/prop_test_local_files.rs b/rust/garbage_collector/tests/prop_test_local_files.rs deleted file mode 100644 index 8b977312e6b..00000000000 --- a/rust/garbage_collector/tests/prop_test_local_files.rs +++ /dev/null @@ -1,981 +0,0 @@ -// Property Tests for Garbage Collection service. -// RefState and SUT based implementation using TestSysDb and LocalStorage. -// -// SUT uses TestSysDb and LocalStorage. -// Main transitions are: -// 1. Create a new collection. -// 2. Add a new version to a collection. -// 3. Cleanup versions from a collection. -// -// For AddVersion, -// SUT creates a new version file and adds the segment block ids to it. -// SparseIndices are created on LocalStorage to mimic compaction of new data. -// While SparseIndex files are created on disk, there is no creation of actual Block files on disk. -// A mapping of collection to segment to block ids is maintained in RefState. -// This allows each AddVersion to work off the previous version's segment to block id mapping. -// RefState is updated with the new version's segment to block id mapping. -// So, RefState does not use TestSysDb or LocalStorage. -// -// For CleanupVersions, -// SUT runs the actual Garbage Collection orchestrator. -// RefState does its independent computation of versions to delete, and the block ids to delete. -// -// Note on Time manipulation - -// Using mock of Tokio time can create issues which are hard to debug due to -// implementation of the mock when runtime has no jobs to do. -// Time is maintained as a u64 monotonic counter. -// Each time a transition happens, this counter (RefState::highest_registered_time) is increased by 1. -// This allows a very deterministic run (& re-run) of the state machine. -// All transitions that need time, use this counter, whose starting value it 100. -// Eg: -// A collection is created at t=100. -// New Version is added at t=101 -// Another Version is added at t=102 -// CleanUp versions can be called at t=103, with a cutoff of 1 seconds. - -// TODO(rohitcp): -// Min versions to keep is 2. Make this configurable, and randomize it per collection. - -use chroma_blockstore::test_utils::sparse_index_test_utils::create_test_sparse_index; -use chroma_storage::local::LocalStorage; -use chroma_storage::Storage; -use chroma_sysdb::GetCollectionsOptions; -use chroma_sysdb::TestSysDb; -use chroma_system::Orchestrator; -use chroma_types::chroma_proto::FilePaths; -use chroma_types::chroma_proto::FlushSegmentCompactionInfo; -use chroma_types::Segment; -use chroma_types::SegmentFlushInfo; -use chroma_types::SegmentScope; -use chroma_types::SegmentType; -use chroma_types::{CollectionUuid, SegmentUuid}; -use chrono::DateTime; -use futures::executor::block_on; -use garbage_collector_library::garbage_collector_orchestrator::GarbageCollectorOrchestrator; -use garbage_collector_library::types::CleanupMode; -use garbage_collector_library::types::GarbageCollectorResponse; -use itertools::Itertools; -use proptest::prelude::*; -use proptest::strategy::BoxedStrategy; -use proptest_state_machine::{prop_state_machine, ReferenceStateMachine, StateMachineTest}; -use rand::prelude::SliceRandom; -use std::collections::{HashMap, HashSet}; -use std::str::FromStr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use uuid::Uuid; - -// SegmentBlockIdInfo is used to keep track of the segment block ids for a version. -// A vector of SegmentBlockIdInfo is enough to get all block ids associated with a version. -#[derive(Clone, Debug)] -struct SegmentBlockIdInfo { - segment_id: SegmentUuid, - block_ids: Vec, - segment_type: SegmentType, -} - -// Transitions for the State Machine. -#[derive(Clone, Debug)] -enum Transition { - // Create a new collection. - CreateCollection { - id: String, - creation_time_secs: u64, - segments: Vec, - }, - // Add version to a specific collection. - // id is the name of the collection. - AddVersion { - id: String, - segment_block_ids: Vec, - to_remove_block_ids: Vec, - creation_time_secs: u64, - }, - // Cleanup versions from a specific collection. - CleanupVersions { - id: String, - cutoff_time: u64, - }, -} - -type VersionToSegmentBlockIdsMap = HashMap>; -type VersionToFilesMap = HashMap>; -type VersionToCreationTimeMap = HashMap; - -#[derive(Clone, Debug)] -struct RefState { - // Keep track of collections. - // Used in pre-conditions to ensure AddVersion is only called on existing collections. - collections: HashSet, - // Keep track of creation time for each version. - coll_to_creation_time_map: HashMap, - // Keep track of the segment and corresponding block ids for each version. - // i.e. collection_uuid -> version -> Vec - coll_to_segment_block_ids_map: HashMap, - // Keep track of dropped block ids for each version. - // This info can be used to compute the block ids to delete. - // Using this info minimizes the change of making a mistake in cleanup computation. - coll_to_dropped_block_ids_map: HashMap, - // Min versions to keep for all collections. - min_versions_to_keep: u64, - // Keep track of the highest registered time for all collections. - // Helps to mock the time. - highest_registered_time: u64, // TODO: Suffix with _secs to make it consistent with cutoff_secs. - // Keep track of the files that were deleted in the last cleanup. - last_cleanup_files: Vec, - last_cleanup_collection_id: String, -} - -impl RefState { - // TODO(rohitcp): Remove this if not needed. - fn _new(min_versions_to_keep: u64) -> Self { - Self { - collections: HashSet::new(), - coll_to_segment_block_ids_map: HashMap::new(), - coll_to_creation_time_map: HashMap::new(), - coll_to_dropped_block_ids_map: HashMap::new(), - min_versions_to_keep, - highest_registered_time: 100, - last_cleanup_files: Vec::new(), - last_cleanup_collection_id: String::new(), - } - } - - // Gets the block ids for a version. Uses to create mock block ids for next version. - // Keep track of this enables the next version to re-use block ids and mimic actual prod behavior. - pub fn get_segment_block_ids_for_version( - &self, - collection_id: String, - version: u64, - ) -> Vec { - self.coll_to_segment_block_ids_map - .get(&collection_id) - .and_then(|versions| versions.get(&version)) - .cloned() - .unwrap_or_default() - } - - pub fn get_current_version(&self, collection_id: String) -> u64 { - self.coll_to_creation_time_map - .get(&collection_id) - .and_then(|versions| versions.keys().max().copied()) - .unwrap_or(0) // Return 0 if no versions exist - } - - // Update the mapping of which block ids are present for a version. - // The RefState should only keep track of version to files mapping, and nothing else. - fn add_version( - mut self, - id: String, - version: u64, - creation_time_secs: u64, - segment_block_ids: Vec, - dropped_block_ids: Vec, - ) -> Self { - // Only proceed if collection exists - if !self.collections.contains(&id) { - return self; - } - - // Initialize maps for new collections if they don't exist - self.coll_to_creation_time_map - .entry(id.clone()) - .or_default(); - self.coll_to_dropped_block_ids_map - .entry(id.clone()) - .or_default(); - self.coll_to_segment_block_ids_map - .entry(id.clone()) - .or_default(); - - // Assert that the creation time is greater than the highest registered time. - assert!(creation_time_secs > self.highest_registered_time); - self.highest_registered_time = creation_time_secs; - - // Update the mappings - self.coll_to_creation_time_map - .get_mut(&id) - .unwrap() - .insert(version, creation_time_secs); - self.coll_to_dropped_block_ids_map - .get_mut(&id) - .unwrap() - .insert(version, dropped_block_ids); - self.coll_to_segment_block_ids_map - .get_mut(&id) - .unwrap() - .insert(version, segment_block_ids); - - self - } - - fn create_collection( - mut self, - id: String, - segments: Vec, - creation_time_secs: u64, - ) -> Self { - assert!( - !self.collections.contains(&id), - "RSM: create_collection: collection already exists: {}", - id - ); - - self.collections.insert(id.clone()); - // Initialize empty maps for the new collection - self.coll_to_dropped_block_ids_map - .insert(id.clone(), HashMap::new()); - - // Put the segment block ids for the collection. - // AddVersion calls will need to use this to find the segment block ids for the collection. - self.coll_to_segment_block_ids_map - .insert(id.clone(), HashMap::new()); - let segment_block_ids = segments - .iter() - .map(|s| SegmentBlockIdInfo { - segment_id: s.id, - block_ids: vec![], - segment_type: s.r#type, - }) - .collect(); - self.coll_to_segment_block_ids_map - .get_mut(&id) - .unwrap() - .insert(0, segment_block_ids); - - // Insert the initial version to creation time mapping. - // This is used to find current version for the collection, and the creation time for the initial version. - let mut initial_version_to_creation_time = HashMap::new(); - initial_version_to_creation_time.insert(0, creation_time_secs); - self.coll_to_creation_time_map - .insert(id.clone(), initial_version_to_creation_time); - self - } - - fn cleanup_versions(mut self, collection_id: String, cutoff_time: u64) -> Self { - assert!( - self.collections.contains(&collection_id), - "RSM: cleanup_versions: collection does not exist: {}", - collection_id - ); - - // For debugging purposes, keep track of the collection id for which cleanup is being done. - self.last_cleanup_collection_id = collection_id.clone(); - - // First get all versions present for the collection - let versions_present: Vec = self - .coll_to_creation_time_map - .get(&collection_id) - .unwrap() - .iter() - .sorted_by_key(|(version, _)| *version) - .rev() - .map(|(version, _)| *version) - .collect(); - - // Then get the oldest version to keep - let oldest_version_to_keep = versions_present - .get(self.min_versions_to_keep as usize - 1) // -1 since its 0-indexed. - .unwrap(); - - let mut versions_to_delete = self - .coll_to_creation_time_map - .get(&collection_id) - .unwrap() - .iter() - .filter(|(version, creation_time)| { - **creation_time < cutoff_time && version < &oldest_version_to_keep && **version > 0 - }) - .map(|(version, _)| *version) - .collect::>(); - versions_to_delete.sort(); - - tracing::info!( - line = line!(), - "RefState: cleanup_versions: versions to creation time: {:?}", - self.coll_to_creation_time_map - ); - tracing::info!( - line = line!(), - "RSM: cleanup_versions: cutoff_time: {:?}, versions_present: {:?}, oldest_to_keep: {:?}, to_delete: {:?} ", - cutoff_time, - versions_present, - oldest_version_to_keep, - versions_to_delete - ); - - // Method 1: Using segment block IDs - let mut files_to_delete_method1 = HashSet::new(); - for version in versions_to_delete.clone() { - let next_version = version + 1; // Since the min_versions_to_keep is always > 1, we can be sure next_version exists in the hashamp. - let current_segments = &self.coll_to_segment_block_ids_map[&collection_id][&version]; - let next_segments = &self.coll_to_segment_block_ids_map[&collection_id][&next_version]; - - for current_segment in current_segments { - if let Some(next_segment) = next_segments - .iter() - .find(|s| s.segment_id == current_segment.segment_id) - { - for block_id in ¤t_segment.block_ids { - // If the block id is not present in the next version, add it to the files to delete. - if !next_segment.block_ids.contains(block_id) { - files_to_delete_method1.insert(*block_id); - } - } - } else { - panic!( - "RSM: cleanup_versions: segment not found in next version: {:?}", - current_segment.segment_id - ); - } - } - } - - // Method 2: Using dropped block IDs map - let mut files_to_delete_method2 = HashSet::new(); - for version in versions_to_delete.clone() { - let next_version = version + 1; - if let Some(dropped_blocks) = - self.coll_to_dropped_block_ids_map[&collection_id].get(&next_version) - { - files_to_delete_method2.extend(dropped_blocks.iter().cloned()); - } - } - - // Verify both methods give the same result - assert_eq!( - files_to_delete_method1, - files_to_delete_method2, - "RSM: cleanup_versions: different results from two methods. Method1: {:?}, Method2: {:?}", - files_to_delete_method1, - files_to_delete_method2 - ); - - // Update last_cleanup_files with the files to delete (can use either method since they're equal) - self.last_cleanup_files = files_to_delete_method1 - .iter() - .map(|uuid| uuid.to_string()) - .collect(); - - // Print the entire version to segment block id mapping for the collection. - tracing::info!( - line = line!(), - "************\nRSM: cleanup_versions: version to segment block id mapping for collection: {:?}\n************", - self.coll_to_segment_block_ids_map[&collection_id] - ); - // Print the files to delete. - tracing::info!( - line = line!(), - "************\nRSM: cleanup_versions: files to delete for collection: {:?}\n************", - self.last_cleanup_files - ); - - self - } -} - -impl ReferenceStateMachine for RefState { - type Transition = Transition; - type State = RefState; - - fn init_state() -> BoxedStrategy { - Just(Self { - collections: HashSet::new(), - coll_to_creation_time_map: HashMap::new(), - coll_to_dropped_block_ids_map: HashMap::new(), - coll_to_segment_block_ids_map: HashMap::new(), - min_versions_to_keep: 2, - highest_registered_time: 100, - last_cleanup_files: Vec::new(), - last_cleanup_collection_id: String::new(), - }) - .boxed() - } - - fn transitions(state: &Self::State) -> BoxedStrategy { - let new_collection_id_strategy = Just(()).prop_map(|_| Uuid::new_v4().to_string()); - let existing_collection_ids: Vec = state.collections.iter().cloned().collect(); - let state_clone = state.clone(); - // Create a random cutoff window between 1 and 10. - let cutoff_window_secs = 3; - // Compute the cutoff time. - let cutoff_time = state_clone.highest_registered_time - cutoff_window_secs; - - if existing_collection_ids.is_empty() { - // If no collections exist, only generate CreateCollection transitions - new_collection_id_strategy - .prop_map(move |id| { - let next_time = state_clone.highest_registered_time + 1; - Transition::CreateCollection { - id: id.clone(), - creation_time_secs: next_time, - segments: generate_segments_for_collection( - CollectionUuid::from_str(&id.clone()).unwrap(), - ), - } - }) - .boxed() - } else { - // Otherwise generate all types of transitions - prop_oneof![ - // Weight the strategies to make CreateCollection less frequent - 1 => new_collection_id_strategy - .prop_map(move |id| { - let next_time = state_clone.highest_registered_time + 1; - Transition::CreateCollection { - id: id.clone(), - creation_time_secs: next_time, - segments: generate_segments_for_collection(CollectionUuid::from_str(&id.clone()).unwrap()), - } - }), - 4 => prop::sample::select(existing_collection_ids.clone()).prop_map(move |id| { - let segment_block_ids = state_clone.get_segment_block_ids_for_version( - id.clone(), - state_clone.get_current_version(id.clone()), - ); - // tracing::info!( - // line = line!(), - // "RSM: transitions: segment_block_ids for existing collection: {:?}", - // segment_block_ids - // ); - let (segment_block_ids_new_version, dropped_block_ids) = segment_block_ids_for_next_version(segment_block_ids); - Transition::AddVersion { - id: id.clone(), - to_remove_block_ids: dropped_block_ids, - creation_time_secs: state_clone.highest_registered_time + 1, - segment_block_ids: segment_block_ids_new_version, - } - }), - 2 => prop::sample::select(existing_collection_ids).prop_map(move |id| { - Transition::CleanupVersions { - id: id.clone(), - cutoff_time, - } - }), - ] - .boxed() - } - } - - fn preconditions(state: &Self::State, transition: &Self::Transition) -> bool { - match transition { - Transition::AddVersion { - id, - to_remove_block_ids: _, - creation_time_secs: _, - segment_block_ids: _, - } => state.collections.contains(id), - Transition::CleanupVersions { id, cutoff_time } => { - state.collections.contains(id) && - *cutoff_time <= state.highest_registered_time && - // Check if we have enough versions to perform cleanup - state.coll_to_creation_time_map - .get(id) - .map(|versions| versions.len() > state.min_versions_to_keep as usize) - .unwrap_or(false) - } - Transition::CreateCollection { - id, - segments: _, - creation_time_secs: _, - } => !state.collections.contains(id), - } - } - - fn apply(state: Self::State, transition: &Self::Transition) -> Self { - // tracing::info!( - // line = line!(), - // "Applying transition: {:?} to RefState", - // transition - // ); - match transition { - Transition::AddVersion { - id, - to_remove_block_ids, - creation_time_secs, - segment_block_ids, - } => state.clone().add_version( - id.clone(), - state.clone().get_current_version(id.clone()) + 1, - *creation_time_secs, - segment_block_ids.clone(), - to_remove_block_ids.clone(), - ), - Transition::CleanupVersions { id, cutoff_time } => { - state.clone().cleanup_versions(id.clone(), *cutoff_time) - } - Transition::CreateCollection { - id, - segments, - creation_time_secs, - } => state - .clone() - .create_collection(id.clone(), segments.clone(), *creation_time_secs), - } - } -} - -// Add this at the top level of the file -static INVARIANT_CHECK_COUNT: AtomicUsize = AtomicUsize::new(0); - -struct GcTest { - storage: Storage, - sysdb: chroma_sysdb::SysDb, - last_cleanup_files: Vec, -} - -impl Default for GcTest { - fn default() -> Self { - // Create local storage for testing - let tmp_dir = tempfile::tempdir().unwrap(); - let storage_dir = tmp_dir.path().to_str().unwrap(); - tracing::info!(line = line!(), "GcTest: storage_dir: {:?}", storage_dir); - let storage = Storage::Local(LocalStorage::new(storage_dir)); - - // Create test sysdb instance - let mut sysdb = chroma_sysdb::SysDb::Test(TestSysDb::new()); - - // Set storage using block_on since set_storage is async - if let chroma_sysdb::SysDb::Test(test_sysdb) = &mut sysdb { - test_sysdb.set_storage(Some(storage.clone())); - } - - Self { - storage, - sysdb, - last_cleanup_files: Vec::new(), - } - } -} - -fn get_version_file_name(sysdb: &chroma_sysdb::SysDb, id: String) -> String { - let collection_id = CollectionUuid::from_str(&id).unwrap(); - match sysdb { - chroma_sysdb::SysDb::Test(test_sysdb) => test_sysdb.get_version_file_name(collection_id), - _ => panic!("get_version_file_name only supported for TestSysDb"), - } -} - -impl GcTest { - // Logic: - // 1. Get version file name from sysdb. - // 2. Prepare to call flush compaction. - // 3. Call FlushCompaction on TestSysDb. - // 4. Update the version file in storage since SysDb does not do this. - async fn add_version( - mut self, - id: String, - _version: u64, - creation_time_secs: u64, - segment_block_ids: Vec, - ) -> Self { - // Set the mock time before calling flush_compaction - if let chroma_sysdb::SysDb::Test(test_sysdb) = &mut self.sysdb { - test_sysdb.set_mock_time(creation_time_secs); - } - - // 1. Get version file name and current version from sysdb - let collection_id = CollectionUuid::from_str(&id).unwrap(); - let collections = self - .sysdb - .get_collections(GetCollectionsOptions { - collection_id: Some(collection_id), - ..Default::default() - }) - .await - .unwrap(); - - let collection = match collections.first() { - Some(c) => c, - None => return self, - }; - let current_version = collection.version; - - // ----- Prepare to call flush compaction. --------- - // Create the new record segment. - let record_segment_info = segment_block_ids - .iter() - .find(|sbi| sbi.segment_type == SegmentType::BlockfileRecord) - .unwrap() - .clone(); - // Create sparse index for record segment - let sparse_index_id = create_test_sparse_index( - &self.storage, - Uuid::new_v4(), - record_segment_info.block_ids.clone(), - Some("test_si_rec_".to_string()), - "".to_string(), - ) - .await - .unwrap(); - // Create segment info for this version - let record_segment_id = record_segment_info.segment_id; - let mut file_paths = HashMap::new(); - file_paths.insert( - "rec_blockfile_1".to_string(), - FilePaths { - paths: vec![sparse_index_id.to_string()], - }, - ); - let record_segment_info = FlushSegmentCompactionInfo { - segment_id: record_segment_id.to_string(), - file_paths, - }; - - // Create sparse index for metadata segment - let metadata_segment_info = segment_block_ids - .iter() - .find(|sbi| sbi.segment_type == SegmentType::BlockfileMetadata) - .unwrap() - .clone(); - let sparse_index_id_metadata = create_test_sparse_index( - &self.storage, - Uuid::new_v4(), - metadata_segment_info.block_ids.clone(), - Some("test_si_meta_".to_string()), - "".to_string(), - ) - .await - .unwrap(); - // Create segment info for this version - let metadata_segment_id = metadata_segment_info.segment_id; - let mut file_paths_metadata = HashMap::new(); - file_paths_metadata.insert( - "metadata_blockfile_1".to_string(), - FilePaths { - paths: vec![sparse_index_id_metadata.to_string()], - }, - ); - let metadata_segment_info = FlushSegmentCompactionInfo { - segment_id: metadata_segment_id.to_string(), - file_paths: file_paths_metadata, - }; - - let record_segment_id = SegmentUuid::from_str(&record_segment_info.segment_id).unwrap(); - let metadata_segment_id = SegmentUuid::from_str(&metadata_segment_info.segment_id).unwrap(); - - // Handle flush_compaction errors - match self - .sysdb - .clone() - .flush_compaction( - "tenant".to_string(), - collection_id, - 0, // log_position - current_version, - Arc::new([ - SegmentFlushInfo { - segment_id: record_segment_id, - file_paths: record_segment_info - .file_paths - .into_iter() - .map(|(k, v)| (k, v.paths)) - .collect(), - }, - SegmentFlushInfo { - segment_id: metadata_segment_id, - file_paths: metadata_segment_info - .file_paths - .into_iter() - .map(|(k, v)| (k, v.paths)) - .collect(), - }, - ]), - 0, // total_records_post_compaction - 0, // size_bytes_post_compaction - None, - ) - .await - { - Ok(_) => (), - Err(e) => { - panic!("Failed to flush compaction: {:?}", e); - } - } - - self - } - - async fn create_collection( - mut self, - id: String, - segments: Vec, - creation_time_secs: u64, - ) -> Self { - // Set the mock time before creating collection - if let chroma_sysdb::SysDb::Test(test_sysdb) = &mut self.sysdb { - test_sysdb.set_mock_time(creation_time_secs); - } - - let collection_id = CollectionUuid::from_str(&id).unwrap(); - let result = self - .sysdb - .create_collection( - "tenant".to_string(), - "database".to_string(), - collection_id, - "collection".to_string(), - segments, - None, - None, - None, - None, - false, - ) - .await; - assert!( - result.is_ok(), - "Failed to create collection: {:?}", - result.err() - ); - self - } - - async fn cleanup_versions(mut self, id: String, cutoff_time: u64) -> Self { - let mut sysdb = self.sysdb.clone(); - let storage = self.storage.clone(); - - // Do the actual Garbage Collection. - let system = chroma_system::System::new(); - let dispatcher = chroma_system::Dispatcher::new(chroma_system::DispatcherConfig::default()); - let mut dispatcher_handle = system.start_component(dispatcher); - - let collection_id = Uuid::parse_str(&id).unwrap(); - let collections = sysdb - .get_collections(GetCollectionsOptions { - collection_id: Some(CollectionUuid(collection_id)), - ..Default::default() - }) - .await - .unwrap(); - - // Return early if no collection is found - let collection = match collections.first() { - Some(c) => c, - None => { - panic!( - "Collection not found during cleanup: {}. Check preconditions logic.", - id - ); - } - }; - - let version_file_name = get_version_file_name(&self.sysdb, id); - let orchestrator = GarbageCollectorOrchestrator::new( - collection.collection_id, - version_file_name, - DateTime::from_timestamp(cutoff_time as i64, 0).unwrap(), - sysdb, - dispatcher_handle.clone(), - storage, - CleanupMode::Delete, - ); - - self.last_cleanup_files = Vec::new(); - match orchestrator.run(system.clone()).await { - #[expect(deprecated)] - Ok(GarbageCollectorResponse { deletion_list, .. }) => { - self.last_cleanup_files = deletion_list; - - tracing::info!( - line = line!(), - "GcTest: cleanup_versions: last_cleanup_files: {:?}", - self.last_cleanup_files - ); - } - Err(e) => { - tracing::error!(line = line!(), "Error during garbage collection: {:?}", e); - } - } - - // Print the files to delete. - tracing::debug!( - line = line!(), - "==========\nGcTest: cleanup_versions: last_cleanup_files: {:?}\n==========", - self.last_cleanup_files - ); - - system.stop().await; - system.join().await; - dispatcher_handle.stop(); - dispatcher_handle.join().await.unwrap(); - - self - } -} - -impl StateMachineTest for GcTest { - type SystemUnderTest = Self; - type Reference = RefState; - - fn init_test( - _ref_state: &::State, - ) -> Self::SystemUnderTest { - tracing::info!(line = line!(), "Initializing new test instance"); - Self::default() - } - - fn apply( - state: Self::SystemUnderTest, - ref_state: &::State, - transition: ::Transition, - ) -> Self::SystemUnderTest { - tracing::debug!( - line = line!(), - "Applying transition: {:?} to SUT", - transition - ); - match transition { - Transition::AddVersion { - id, - to_remove_block_ids: _, - creation_time_secs, - segment_block_ids, - } => block_on(state.add_version( - id.clone(), - ref_state.get_current_version(id.clone()) + 1, - creation_time_secs, - segment_block_ids.clone(), - )), - Transition::CreateCollection { - id, - segments, - creation_time_secs, - } => block_on(state.create_collection(id, segments, creation_time_secs)), - Transition::CleanupVersions { id, cutoff_time } => { - block_on(state.cleanup_versions(id, cutoff_time)) - } - } - } - - fn check_invariants( - state: &Self::SystemUnderTest, - ref_state: &::State, - ) { - INVARIANT_CHECK_COUNT.fetch_add(1, Ordering::SeqCst); - tracing::debug!( - line = line!(), - "Checking invariants (count: {})", - INVARIANT_CHECK_COUNT.load(Ordering::SeqCst) - ); - - // Remove the block/ prefix and sort - let mut state_last_cleanup_files: Vec = state - .last_cleanup_files - .iter() - .map(|file| file.replace("block/", "")) - .collect(); - state_last_cleanup_files.sort(); - - // Sort reference state files - let mut ref_last_cleanup_files = ref_state.last_cleanup_files.clone(); - ref_last_cleanup_files.sort(); - - assert_eq!( - state_last_cleanup_files, - ref_last_cleanup_files, - "Cleanup files mismatch for collection: {:?} after sorting - SUT: {:?}, Reference: {:?}", - ref_state.last_cleanup_collection_id, - state_last_cleanup_files, - ref_last_cleanup_files - ); - } -} - -fn generate_segments_for_collection(collection_id: CollectionUuid) -> Vec { - let record_segment = Segment { - id: SegmentUuid::new(), - r#type: SegmentType::BlockfileRecord, - scope: SegmentScope::RECORD, - collection: collection_id, - metadata: None, - file_path: HashMap::new(), - }; - let metadata_segment = Segment { - id: SegmentUuid::new(), - r#type: SegmentType::BlockfileMetadata, - scope: SegmentScope::METADATA, - collection: collection_id, - metadata: None, - file_path: HashMap::new(), - }; - - vec![record_segment, metadata_segment] -} - -fn segment_block_ids_for_next_version( - existing_segment_block_ids: Vec, -) -> (Vec, Vec) { - let mut new_segment_block_ids = Vec::new(); - let mut dropped_block_ids = Vec::new(); - for segment in existing_segment_block_ids { - let block_ids = segment.block_ids.clone(); - let (new_block_ids, dropped_ids) = blocks_ids_for_next_version(block_ids); - new_segment_block_ids.push(SegmentBlockIdInfo { - segment_id: segment.segment_id, - block_ids: new_block_ids, - segment_type: segment.segment_type, - }); - // Add dropped block ids to the list of dropped block ids - dropped_block_ids.extend(dropped_ids); - } - - (new_segment_block_ids, dropped_block_ids) -} - -fn blocks_ids_for_next_version(block_ids: Vec) -> (Vec, Vec) { - let mut rng = rand::thread_rng(); - - // If there are no existing block IDs, just generate new ones - if block_ids.is_empty() { - let num_new_blocks = rng.gen_range(1..=10); - let new_block_ids: Vec = (0..num_new_blocks).map(|_| Uuid::new_v4()).collect(); - // tracing::info!( - // line = line!(), - // "RSM: new blocks_ids_for_next_version: new_block_ids: {:?}", - // new_block_ids - // ); - return (new_block_ids, Vec::new()); - } - - let keep_percentage = rng.gen_range(30..=90) as f64 / 100.0; - let num_to_keep = (block_ids.len() as f64 * keep_percentage).ceil() as usize; - let mut kept_block_ids: Vec = block_ids - .choose_multiple(&mut rng, num_to_keep) - .cloned() - .collect(); - let num_new_blocks = rng.gen_range(0..=10); - let new_block_ids: Vec = (0..num_new_blocks).map(|_| Uuid::new_v4()).collect(); - let dropped_block_ids = block_ids - .into_iter() - .filter(|id| !kept_block_ids.contains(id)) - .collect(); - - kept_block_ids.extend(new_block_ids); - (kept_block_ids, dropped_block_ids) -} - -prop_state_machine! { - fn run_gc_test( - sequential - 1..50 - => - GcTest - ); -} - -#[tokio::test(flavor = "multi_thread")] -async fn run_gc_test_ext() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - INVARIANT_CHECK_COUNT.store(0, Ordering::SeqCst); - run_gc_test(); - let checks = INVARIANT_CHECK_COUNT.load(Ordering::SeqCst); - assert!( - checks > 0, - "check_invariants was never called! Count: {}", - checks - ); -} diff --git a/rust/garbage_collector/tests/proptest_helpers/garbage_collector_under_test.rs b/rust/garbage_collector/tests/proptest_helpers/garbage_collector_under_test.rs index d9b5d55288d..585eba138dc 100644 --- a/rust/garbage_collector/tests/proptest_helpers/garbage_collector_under_test.rs +++ b/rust/garbage_collector/tests/proptest_helpers/garbage_collector_under_test.rs @@ -328,7 +328,7 @@ impl StateMachineTest for GarbageCollectorUnderTest { state.storage.clone(), state.logs.clone(), state.root_manager.clone(), - CleanupMode::Delete, + CleanupMode::DeleteV2, min_versions_to_keep as u32, true, false,