Skip to content

Commit 36e614c

Browse files
committed
[CLN]: remove v1 of garbage collection
1 parent 4212a46 commit 36e614c

File tree

9 files changed

+67
-2477
lines changed

9 files changed

+67
-2477
lines changed

rust/garbage_collector/src/garbage_collector_component.rs

Lines changed: 49 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ enum GarbageCollectCollectionError {
6262
#[error("Uninitialized: missing dispatcher or system")]
6363
Uninitialized,
6464
#[error("Failed to run garbage collection orchestrator: {0}")]
65-
OrchestratorError(#[from] crate::garbage_collector_orchestrator::GarbageCollectorError),
66-
#[error("Failed to run garbage collection orchestrator: {0}")]
6765
OrchestratorV2Error(#[from] crate::garbage_collector_orchestrator_v2::GarbageCollectorError),
6866
}
6967

@@ -166,75 +164,39 @@ impl GarbageCollector {
166164
.as_ref()
167165
.ok_or(GarbageCollectCollectionError::Uninitialized)?;
168166

169-
if cleanup_mode.is_v2() {
170-
let enable_log_gc = collection.tenant <= self.config.enable_log_gc_for_tenant_threshold
171-
|| self
172-
.config
173-
.enable_log_gc_for_tenant
174-
.contains(&collection.tenant);
167+
let enable_log_gc = collection.tenant <= self.config.enable_log_gc_for_tenant_threshold
168+
|| self
169+
.config
170+
.enable_log_gc_for_tenant
171+
.contains(&collection.tenant);
175172

176-
let orchestrator =
177-
crate::garbage_collector_orchestrator_v2::GarbageCollectorOrchestrator::new(
178-
collection.id,
179-
collection.version_file_path,
180-
collection.lineage_file_path,
181-
version_absolute_cutoff_time,
182-
collection_soft_delete_absolute_cutoff_time,
183-
self.sysdb_client.clone(),
184-
dispatcher.clone(),
185-
system.clone(),
186-
self.storage.clone(),
187-
self.logs.clone(),
188-
self.root_manager.clone(),
189-
cleanup_mode,
190-
self.config.min_versions_to_keep,
191-
enable_log_gc,
192-
enable_dangerous_option_to_ignore_min_versions_for_wal3,
193-
);
194-
195-
let started_at = SystemTime::now();
196-
let result = match orchestrator.run(system.clone()).await {
197-
Ok(res) => res,
198-
Err(e) => {
199-
tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e);
200-
return Err(GarbageCollectCollectionError::OrchestratorV2Error(e));
201-
}
202-
};
203-
let duration_ms = started_at
204-
.elapsed()
205-
.map(|d| d.as_millis() as u64)
206-
.unwrap_or(0);
207-
self.job_duration_ms_metric.record(duration_ms, &[]);
208-
self.total_files_deleted_metric.add(
209-
result.num_files_deleted as u64,
210-
&[opentelemetry::KeyValue::new(
211-
"cleanup_mode",
212-
format!("{:?}", cleanup_mode),
213-
)],
214-
);
215-
self.total_versions_deleted_metric.add(
216-
result.num_versions_deleted as u64,
217-
&[opentelemetry::KeyValue::new(
218-
"cleanup_mode",
219-
format!("{:?}", cleanup_mode),
220-
)],
173+
let orchestrator =
174+
crate::garbage_collector_orchestrator_v2::GarbageCollectorOrchestrator::new(
175+
collection.id,
176+
collection.version_file_path,
177+
collection.lineage_file_path,
178+
version_absolute_cutoff_time,
179+
collection_soft_delete_absolute_cutoff_time,
180+
self.sysdb_client.clone(),
181+
dispatcher.clone(),
182+
system.clone(),
183+
self.storage.clone(),
184+
self.logs.clone(),
185+
self.root_manager.clone(),
186+
cleanup_mode,
187+
self.config.min_versions_to_keep,
188+
enable_log_gc,
189+
enable_dangerous_option_to_ignore_min_versions_for_wal3,
221190
);
222191

223-
return Ok(result);
224-
}
225-
226-
let orchestrator = crate::garbage_collector_orchestrator::GarbageCollectorOrchestrator::new(
227-
collection.id,
228-
collection.version_file_path,
229-
version_absolute_cutoff_time,
230-
self.sysdb_client.clone(),
231-
dispatcher.clone(),
232-
self.storage.clone(),
233-
cleanup_mode,
234-
);
235-
236192
let started_at = SystemTime::now();
237-
let result = orchestrator.run(system.clone()).await?;
193+
let result = match orchestrator.run(system.clone()).await {
194+
Ok(res) => res,
195+
Err(e) => {
196+
tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e);
197+
return Err(GarbageCollectCollectionError::OrchestratorV2Error(e));
198+
}
199+
};
238200
let duration_ms = started_at
239201
.elapsed()
240202
.map(|d| d.as_millis() as u64)
@@ -254,6 +216,7 @@ impl GarbageCollector {
254216
format!("{:?}", cleanup_mode),
255217
)],
256218
);
219+
257220
Ok(result)
258221
}
259222

@@ -390,7 +353,6 @@ impl Handler<ManualGarbageCollectionRequest> for GarbageCollector {
390353
struct GarbageCollectResult {
391354
num_completed_jobs: u32,
392355
num_failed_jobs: u32,
393-
num_skipped_jobs: u32,
394356
num_hard_deleted_databases: u32,
395357
}
396358

@@ -492,32 +454,23 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
492454
}
493455
}
494456

495-
let mut num_skipped_jobs = 0;
496-
let collections_to_gc = collections_to_gc.into_iter().map(|collection| {
497-
let cleanup_mode = if let Some(tenant_mode_overrides) = &self.config.tenant_mode_overrides {
498-
tenant_mode_overrides
499-
.get(&collection.tenant)
500-
.cloned()
501-
.unwrap_or(self.config.default_mode)
502-
} else {
503-
self.config.default_mode
504-
};
505-
506-
(cleanup_mode.to_owned(), collection)
507-
}).filter(|(cleanup_mode, collection)| {
508-
if collection.lineage_file_path.is_some() && !cleanup_mode.is_v2() {
509-
tracing::debug!(
510-
"Skipping garbage collection for root of fork tree because GC v1 cannot handle fork trees: {}",
511-
collection.id
512-
);
513-
num_skipped_jobs += 1;
514-
return false;
515-
}
516-
517-
true
518-
})
519-
.take(self.config.max_collections_to_gc as usize)
520-
.collect::<Vec<_>>();
457+
let collections_to_gc = collections_to_gc
458+
.into_iter()
459+
.map(|collection| {
460+
let cleanup_mode =
461+
if let Some(tenant_mode_overrides) = &self.config.tenant_mode_overrides {
462+
tenant_mode_overrides
463+
.get(&collection.tenant)
464+
.cloned()
465+
.unwrap_or(self.config.default_mode)
466+
} else {
467+
self.config.default_mode
468+
};
469+
470+
(cleanup_mode.to_owned(), collection)
471+
})
472+
.take(self.config.max_collections_to_gc as usize)
473+
.collect::<Vec<_>>();
521474

522475
tracing::info!(
523476
"Filtered to {} collections to garbage collect",
@@ -640,7 +593,6 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
640593
return GarbageCollectResult {
641594
num_completed_jobs,
642595
num_failed_jobs,
643-
num_skipped_jobs,
644596
num_hard_deleted_databases: num_hard_deleted_databases as u32,
645597
};
646598
}
@@ -867,134 +819,6 @@ mod tests {
867819
(collection_id, database_name)
868820
}
869821

870-
#[tokio::test]
871-
#[traced_test]
872-
async fn test_k8s_integration_ignores_forked_collections() {
873-
let tenant_id = format!("tenant-{}", Uuid::new_v4());
874-
let tenant_mode_overrides = HashMap::from([(tenant_id.clone(), CleanupMode::Delete)]);
875-
876-
let config = GarbageCollectorConfig {
877-
service_name: "gc".to_string(),
878-
otel_endpoint: "none".to_string(),
879-
otel_filters: vec![OtelFilter {
880-
crate_name: "garbage_collector".to_string(),
881-
filter_level: OtelFilterLevel::Debug,
882-
}],
883-
version_cutoff_time: Duration::from_secs(1),
884-
collection_soft_delete_grace_period: Duration::from_secs(1),
885-
max_collections_to_gc: 100,
886-
max_collections_to_fetch: None,
887-
gc_interval_mins: 10,
888-
disallow_collections: HashSet::new(),
889-
min_versions_to_keep: 2,
890-
filter_min_versions_if_alive: None,
891-
sysdb_config: GrpcSysDbConfig {
892-
host: "localhost".to_string(),
893-
port: 50051,
894-
connect_timeout_ms: 5000,
895-
request_timeout_ms: 10000,
896-
num_channels: 1,
897-
},
898-
dispatcher_config: DispatcherConfig::default(),
899-
storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await,
900-
default_mode: CleanupMode::DryRun,
901-
tenant_mode_overrides: Some(tenant_mode_overrides),
902-
assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(),
903-
my_member_id: "test-gc".to_string(),
904-
memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig::default(),
905-
port: 50055,
906-
root_cache_config: Default::default(),
907-
jemalloc_pprof_server_port: None,
908-
log: LogConfig::Grpc(GrpcLogConfig::default()),
909-
enable_log_gc_for_tenant: Vec::new(),
910-
enable_log_gc_for_tenant_threshold: "ffffffff-ffff-ffff-ffff-ffffffffffff".to_string(),
911-
enable_dangerous_option_to_ignore_min_versions_for_wal3: false,
912-
};
913-
let registry = Registry::new();
914-
915-
// Create collection
916-
let mut clients = ChromaGrpcClients::new().await.unwrap();
917-
918-
let (collection_id, _) = create_test_collection(tenant_id.clone(), &mut clients).await;
919-
let mut sysdb = SysDb::Grpc(
920-
GrpcSysDb::try_from_config(&config.sysdb_config, &registry)
921-
.await
922-
.unwrap(),
923-
);
924-
let collections = sysdb
925-
.get_collections(GetCollectionsOptions {
926-
collection_id: Some(collection_id),
927-
..Default::default()
928-
})
929-
.await
930-
.unwrap();
931-
let collection = collections.first().unwrap();
932-
// Fork collection
933-
sysdb
934-
.fork_collection(
935-
collection_id,
936-
collection.log_position as u64,
937-
collection.log_position as u64,
938-
CollectionUuid::new(),
939-
"test-fork".to_string(),
940-
)
941-
.await
942-
.unwrap();
943-
944-
// Wait 1 second for cutoff time
945-
tokio::time::sleep(Duration::from_secs(1)).await;
946-
947-
// Run garbage collection
948-
let system = System::new();
949-
let mut garbage_collector_component =
950-
GarbageCollector::try_from_config(&(config.clone(), system.clone()), &registry)
951-
.await
952-
.unwrap();
953-
954-
let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, &registry)
955-
.await
956-
.unwrap();
957-
958-
let dispatcher_handle = system.start_component(dispatcher);
959-
960-
garbage_collector_component.set_dispatcher(dispatcher_handle);
961-
garbage_collector_component.set_system(system.clone());
962-
let mut garbage_collector_handle = system.start_component(garbage_collector_component);
963-
964-
garbage_collector_handle
965-
.send(
966-
vec![Member {
967-
member_id: "test-gc".to_string(),
968-
member_ip: "0.0.0.0".to_string(),
969-
member_node_name: "test-gc-node".to_string(),
970-
}],
971-
None,
972-
)
973-
.await
974-
.unwrap();
975-
976-
let result = garbage_collector_handle
977-
.request(
978-
GarbageCollectMessage {
979-
tenant: Some(tenant_id.clone()),
980-
},
981-
Some(Span::current()),
982-
)
983-
.await
984-
.unwrap();
985-
986-
// Should have skipped
987-
assert_eq!(
988-
result,
989-
GarbageCollectResult {
990-
num_completed_jobs: 0,
991-
num_failed_jobs: 0,
992-
num_skipped_jobs: 1,
993-
num_hard_deleted_databases: 0,
994-
}
995-
);
996-
}
997-
998822
#[tokio::test]
999823
#[traced_test]
1000824
async fn test_k8s_integration_tenant_mode_override() {
@@ -1003,7 +827,7 @@ mod tests {
1003827
let tenant_id_for_dry_run_mode = format!("tenant-dry-run-mode-{}", Uuid::new_v4());
1004828

1005829
let mut tenant_mode_overrides = HashMap::new();
1006-
tenant_mode_overrides.insert(tenant_id_for_delete_mode.clone(), CleanupMode::Delete);
830+
tenant_mode_overrides.insert(tenant_id_for_delete_mode.clone(), CleanupMode::DeleteV2);
1007831

1008832
let config = GarbageCollectorConfig {
1009833
service_name: "gc".to_string(),
@@ -1029,7 +853,7 @@ mod tests {
1029853
},
1030854
dispatcher_config: DispatcherConfig::default(),
1031855
storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await,
1032-
default_mode: CleanupMode::DryRun,
856+
default_mode: CleanupMode::DryRunV2,
1033857
tenant_mode_overrides: Some(tenant_mode_overrides),
1034858
assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(),
1035859
my_member_id: "test-gc".to_string(),
@@ -1341,7 +1165,6 @@ mod tests {
13411165
GarbageCollectResult {
13421166
num_completed_jobs: 1,
13431167
num_failed_jobs: 0,
1344-
num_skipped_jobs: 0,
13451168
num_hard_deleted_databases: 0, // The database should not have been hard deleted yet
13461169
}
13471170
);
@@ -1364,7 +1187,6 @@ mod tests {
13641187
GarbageCollectResult {
13651188
num_completed_jobs: 1,
13661189
num_failed_jobs: 0,
1367-
num_skipped_jobs: 0,
13681190
num_hard_deleted_databases: 1, // The database should have been hard deleted
13691191
}
13701192
);

0 commit comments

Comments
 (0)