@@ -62,8 +62,6 @@ enum GarbageCollectCollectionError {
6262 #[ error( "Uninitialized: missing dispatcher or system" ) ]
6363 Uninitialized ,
6464 #[ error( "Failed to run garbage collection orchestrator: {0}" ) ]
65- OrchestratorError ( #[ from] crate :: garbage_collector_orchestrator:: GarbageCollectorError ) ,
66- #[ error( "Failed to run garbage collection orchestrator: {0}" ) ]
6765 OrchestratorV2Error ( #[ from] crate :: garbage_collector_orchestrator_v2:: GarbageCollectorError ) ,
6866}
6967
@@ -166,75 +164,39 @@ impl GarbageCollector {
166164 . as_ref ( )
167165 . ok_or ( GarbageCollectCollectionError :: Uninitialized ) ?;
168166
169- if cleanup_mode. is_v2 ( ) {
170- let enable_log_gc = collection. tenant <= self . config . enable_log_gc_for_tenant_threshold
171- || self
172- . config
173- . enable_log_gc_for_tenant
174- . contains ( & collection. tenant ) ;
167+ let enable_log_gc = collection. tenant <= self . config . enable_log_gc_for_tenant_threshold
168+ || self
169+ . config
170+ . enable_log_gc_for_tenant
171+ . contains ( & collection. tenant ) ;
175172
176- let orchestrator =
177- crate :: garbage_collector_orchestrator_v2:: GarbageCollectorOrchestrator :: new (
178- collection. id ,
179- collection. version_file_path ,
180- collection. lineage_file_path ,
181- version_absolute_cutoff_time,
182- collection_soft_delete_absolute_cutoff_time,
183- self . sysdb_client . clone ( ) ,
184- dispatcher. clone ( ) ,
185- system. clone ( ) ,
186- self . storage . clone ( ) ,
187- self . logs . clone ( ) ,
188- self . root_manager . clone ( ) ,
189- cleanup_mode,
190- self . config . min_versions_to_keep ,
191- enable_log_gc,
192- enable_dangerous_option_to_ignore_min_versions_for_wal3,
193- ) ;
194-
195- let started_at = SystemTime :: now ( ) ;
196- let result = match orchestrator. run ( system. clone ( ) ) . await {
197- Ok ( res) => res,
198- Err ( e) => {
199- tracing:: error!( "Failed to run garbage collection orchestrator v2: {:?}" , e) ;
200- return Err ( GarbageCollectCollectionError :: OrchestratorV2Error ( e) ) ;
201- }
202- } ;
203- let duration_ms = started_at
204- . elapsed ( )
205- . map ( |d| d. as_millis ( ) as u64 )
206- . unwrap_or ( 0 ) ;
207- self . job_duration_ms_metric . record ( duration_ms, & [ ] ) ;
208- self . total_files_deleted_metric . add (
209- result. num_files_deleted as u64 ,
210- & [ opentelemetry:: KeyValue :: new (
211- "cleanup_mode" ,
212- format ! ( "{:?}" , cleanup_mode) ,
213- ) ] ,
214- ) ;
215- self . total_versions_deleted_metric . add (
216- result. num_versions_deleted as u64 ,
217- & [ opentelemetry:: KeyValue :: new (
218- "cleanup_mode" ,
219- format ! ( "{:?}" , cleanup_mode) ,
220- ) ] ,
173+ let orchestrator =
174+ crate :: garbage_collector_orchestrator_v2:: GarbageCollectorOrchestrator :: new (
175+ collection. id ,
176+ collection. version_file_path ,
177+ collection. lineage_file_path ,
178+ version_absolute_cutoff_time,
179+ collection_soft_delete_absolute_cutoff_time,
180+ self . sysdb_client . clone ( ) ,
181+ dispatcher. clone ( ) ,
182+ system. clone ( ) ,
183+ self . storage . clone ( ) ,
184+ self . logs . clone ( ) ,
185+ self . root_manager . clone ( ) ,
186+ cleanup_mode,
187+ self . config . min_versions_to_keep ,
188+ enable_log_gc,
189+ enable_dangerous_option_to_ignore_min_versions_for_wal3,
221190 ) ;
222191
223- return Ok ( result) ;
224- }
225-
226- let orchestrator = crate :: garbage_collector_orchestrator:: GarbageCollectorOrchestrator :: new (
227- collection. id ,
228- collection. version_file_path ,
229- version_absolute_cutoff_time,
230- self . sysdb_client . clone ( ) ,
231- dispatcher. clone ( ) ,
232- self . storage . clone ( ) ,
233- cleanup_mode,
234- ) ;
235-
236192 let started_at = SystemTime :: now ( ) ;
237- let result = orchestrator. run ( system. clone ( ) ) . await ?;
193+ let result = match orchestrator. run ( system. clone ( ) ) . await {
194+ Ok ( res) => res,
195+ Err ( e) => {
196+ tracing:: error!( "Failed to run garbage collection orchestrator v2: {:?}" , e) ;
197+ return Err ( GarbageCollectCollectionError :: OrchestratorV2Error ( e) ) ;
198+ }
199+ } ;
238200 let duration_ms = started_at
239201 . elapsed ( )
240202 . map ( |d| d. as_millis ( ) as u64 )
@@ -254,6 +216,7 @@ impl GarbageCollector {
254216 format ! ( "{:?}" , cleanup_mode) ,
255217 ) ] ,
256218 ) ;
219+
257220 Ok ( result)
258221 }
259222
@@ -390,7 +353,6 @@ impl Handler<ManualGarbageCollectionRequest> for GarbageCollector {
390353struct GarbageCollectResult {
391354 num_completed_jobs : u32 ,
392355 num_failed_jobs : u32 ,
393- num_skipped_jobs : u32 ,
394356 num_hard_deleted_databases : u32 ,
395357}
396358
@@ -492,32 +454,23 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
492454 }
493455 }
494456
495- let mut num_skipped_jobs = 0 ;
496- let collections_to_gc = collections_to_gc. into_iter ( ) . map ( |collection| {
497- let cleanup_mode = if let Some ( tenant_mode_overrides) = & self . config . tenant_mode_overrides {
498- tenant_mode_overrides
499- . get ( & collection. tenant )
500- . cloned ( )
501- . unwrap_or ( self . config . default_mode )
502- } else {
503- self . config . default_mode
504- } ;
505-
506- ( cleanup_mode. to_owned ( ) , collection)
507- } ) . filter ( |( cleanup_mode, collection) | {
508- if collection. lineage_file_path . is_some ( ) && !cleanup_mode. is_v2 ( ) {
509- tracing:: debug!(
510- "Skipping garbage collection for root of fork tree because GC v1 cannot handle fork trees: {}" ,
511- collection. id
512- ) ;
513- num_skipped_jobs += 1 ;
514- return false ;
515- }
516-
517- true
518- } )
519- . take ( self . config . max_collections_to_gc as usize )
520- . collect :: < Vec < _ > > ( ) ;
457+ let collections_to_gc = collections_to_gc
458+ . into_iter ( )
459+ . map ( |collection| {
460+ let cleanup_mode =
461+ if let Some ( tenant_mode_overrides) = & self . config . tenant_mode_overrides {
462+ tenant_mode_overrides
463+ . get ( & collection. tenant )
464+ . cloned ( )
465+ . unwrap_or ( self . config . default_mode )
466+ } else {
467+ self . config . default_mode
468+ } ;
469+
470+ ( cleanup_mode. to_owned ( ) , collection)
471+ } )
472+ . take ( self . config . max_collections_to_gc as usize )
473+ . collect :: < Vec < _ > > ( ) ;
521474
522475 tracing:: info!(
523476 "Filtered to {} collections to garbage collect" ,
@@ -640,7 +593,6 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
640593 return GarbageCollectResult {
641594 num_completed_jobs,
642595 num_failed_jobs,
643- num_skipped_jobs,
644596 num_hard_deleted_databases : num_hard_deleted_databases as u32 ,
645597 } ;
646598 }
@@ -867,134 +819,6 @@ mod tests {
867819 ( collection_id, database_name)
868820 }
869821
870- #[ tokio:: test]
871- #[ traced_test]
872- async fn test_k8s_integration_ignores_forked_collections ( ) {
873- let tenant_id = format ! ( "tenant-{}" , Uuid :: new_v4( ) ) ;
874- let tenant_mode_overrides = HashMap :: from ( [ ( tenant_id. clone ( ) , CleanupMode :: Delete ) ] ) ;
875-
876- let config = GarbageCollectorConfig {
877- service_name : "gc" . to_string ( ) ,
878- otel_endpoint : "none" . to_string ( ) ,
879- otel_filters : vec ! [ OtelFilter {
880- crate_name: "garbage_collector" . to_string( ) ,
881- filter_level: OtelFilterLevel :: Debug ,
882- } ] ,
883- version_cutoff_time : Duration :: from_secs ( 1 ) ,
884- collection_soft_delete_grace_period : Duration :: from_secs ( 1 ) ,
885- max_collections_to_gc : 100 ,
886- max_collections_to_fetch : None ,
887- gc_interval_mins : 10 ,
888- disallow_collections : HashSet :: new ( ) ,
889- min_versions_to_keep : 2 ,
890- filter_min_versions_if_alive : None ,
891- sysdb_config : GrpcSysDbConfig {
892- host : "localhost" . to_string ( ) ,
893- port : 50051 ,
894- connect_timeout_ms : 5000 ,
895- request_timeout_ms : 10000 ,
896- num_channels : 1 ,
897- } ,
898- dispatcher_config : DispatcherConfig :: default ( ) ,
899- storage_config : s3_config_for_localhost_with_bucket_name ( "chroma-storage" ) . await ,
900- default_mode : CleanupMode :: DryRun ,
901- tenant_mode_overrides : Some ( tenant_mode_overrides) ,
902- assignment_policy : chroma_config:: assignment:: config:: AssignmentPolicyConfig :: default ( ) ,
903- my_member_id : "test-gc" . to_string ( ) ,
904- memberlist_provider : chroma_memberlist:: config:: MemberlistProviderConfig :: default ( ) ,
905- port : 50055 ,
906- root_cache_config : Default :: default ( ) ,
907- jemalloc_pprof_server_port : None ,
908- log : LogConfig :: Grpc ( GrpcLogConfig :: default ( ) ) ,
909- enable_log_gc_for_tenant : Vec :: new ( ) ,
910- enable_log_gc_for_tenant_threshold : "ffffffff-ffff-ffff-ffff-ffffffffffff" . to_string ( ) ,
911- enable_dangerous_option_to_ignore_min_versions_for_wal3 : false ,
912- } ;
913- let registry = Registry :: new ( ) ;
914-
915- // Create collection
916- let mut clients = ChromaGrpcClients :: new ( ) . await . unwrap ( ) ;
917-
918- let ( collection_id, _) = create_test_collection ( tenant_id. clone ( ) , & mut clients) . await ;
919- let mut sysdb = SysDb :: Grpc (
920- GrpcSysDb :: try_from_config ( & config. sysdb_config , & registry)
921- . await
922- . unwrap ( ) ,
923- ) ;
924- let collections = sysdb
925- . get_collections ( GetCollectionsOptions {
926- collection_id : Some ( collection_id) ,
927- ..Default :: default ( )
928- } )
929- . await
930- . unwrap ( ) ;
931- let collection = collections. first ( ) . unwrap ( ) ;
932- // Fork collection
933- sysdb
934- . fork_collection (
935- collection_id,
936- collection. log_position as u64 ,
937- collection. log_position as u64 ,
938- CollectionUuid :: new ( ) ,
939- "test-fork" . to_string ( ) ,
940- )
941- . await
942- . unwrap ( ) ;
943-
944- // Wait 1 second for cutoff time
945- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
946-
947- // Run garbage collection
948- let system = System :: new ( ) ;
949- let mut garbage_collector_component =
950- GarbageCollector :: try_from_config ( & ( config. clone ( ) , system. clone ( ) ) , & registry)
951- . await
952- . unwrap ( ) ;
953-
954- let dispatcher = Dispatcher :: try_from_config ( & config. dispatcher_config , & registry)
955- . await
956- . unwrap ( ) ;
957-
958- let dispatcher_handle = system. start_component ( dispatcher) ;
959-
960- garbage_collector_component. set_dispatcher ( dispatcher_handle) ;
961- garbage_collector_component. set_system ( system. clone ( ) ) ;
962- let mut garbage_collector_handle = system. start_component ( garbage_collector_component) ;
963-
964- garbage_collector_handle
965- . send (
966- vec ! [ Member {
967- member_id: "test-gc" . to_string( ) ,
968- member_ip: "0.0.0.0" . to_string( ) ,
969- member_node_name: "test-gc-node" . to_string( ) ,
970- } ] ,
971- None ,
972- )
973- . await
974- . unwrap ( ) ;
975-
976- let result = garbage_collector_handle
977- . request (
978- GarbageCollectMessage {
979- tenant : Some ( tenant_id. clone ( ) ) ,
980- } ,
981- Some ( Span :: current ( ) ) ,
982- )
983- . await
984- . unwrap ( ) ;
985-
986- // Should have skipped
987- assert_eq ! (
988- result,
989- GarbageCollectResult {
990- num_completed_jobs: 0 ,
991- num_failed_jobs: 0 ,
992- num_skipped_jobs: 1 ,
993- num_hard_deleted_databases: 0 ,
994- }
995- ) ;
996- }
997-
998822 #[ tokio:: test]
999823 #[ traced_test]
1000824 async fn test_k8s_integration_tenant_mode_override ( ) {
@@ -1003,7 +827,7 @@ mod tests {
1003827 let tenant_id_for_dry_run_mode = format ! ( "tenant-dry-run-mode-{}" , Uuid :: new_v4( ) ) ;
1004828
1005829 let mut tenant_mode_overrides = HashMap :: new ( ) ;
1006- tenant_mode_overrides. insert ( tenant_id_for_delete_mode. clone ( ) , CleanupMode :: Delete ) ;
830+ tenant_mode_overrides. insert ( tenant_id_for_delete_mode. clone ( ) , CleanupMode :: DeleteV2 ) ;
1007831
1008832 let config = GarbageCollectorConfig {
1009833 service_name : "gc" . to_string ( ) ,
@@ -1029,7 +853,7 @@ mod tests {
1029853 } ,
1030854 dispatcher_config : DispatcherConfig :: default ( ) ,
1031855 storage_config : s3_config_for_localhost_with_bucket_name ( "chroma-storage" ) . await ,
1032- default_mode : CleanupMode :: DryRun ,
856+ default_mode : CleanupMode :: DryRunV2 ,
1033857 tenant_mode_overrides : Some ( tenant_mode_overrides) ,
1034858 assignment_policy : chroma_config:: assignment:: config:: AssignmentPolicyConfig :: default ( ) ,
1035859 my_member_id : "test-gc" . to_string ( ) ,
@@ -1341,7 +1165,6 @@ mod tests {
13411165 GarbageCollectResult {
13421166 num_completed_jobs: 1 ,
13431167 num_failed_jobs: 0 ,
1344- num_skipped_jobs: 0 ,
13451168 num_hard_deleted_databases: 0 , // The database should not have been hard deleted yet
13461169 }
13471170 ) ;
@@ -1364,7 +1187,6 @@ mod tests {
13641187 GarbageCollectResult {
13651188 num_completed_jobs: 1 ,
13661189 num_failed_jobs: 0 ,
1367- num_skipped_jobs: 0 ,
13681190 num_hard_deleted_databases: 1 , // The database should have been hard deleted
13691191 }
13701192 ) ;
0 commit comments