Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 51 additions & 229 deletions rust/garbage_collector/src/garbage_collector_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ enum GarbageCollectCollectionError {
#[error("Uninitialized: missing dispatcher or system")]
Uninitialized,
#[error("Failed to run garbage collection orchestrator: {0}")]
OrchestratorError(#[from] crate::garbage_collector_orchestrator::GarbageCollectorError),
#[error("Failed to run garbage collection orchestrator: {0}")]
OrchestratorV2Error(#[from] crate::garbage_collector_orchestrator_v2::GarbageCollectorError),
}

Expand Down Expand Up @@ -166,75 +164,39 @@ impl GarbageCollector {
.as_ref()
.ok_or(GarbageCollectCollectionError::Uninitialized)?;

if cleanup_mode.is_v2() {
let enable_log_gc = collection.tenant <= self.config.enable_log_gc_for_tenant_threshold
|| self
.config
.enable_log_gc_for_tenant
.contains(&collection.tenant);
let enable_log_gc = collection.tenant <= self.config.enable_log_gc_for_tenant_threshold
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this? (if no we could remove this in a separate PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch no I think we can remove this too
will do separately

|| self
.config
.enable_log_gc_for_tenant
.contains(&collection.tenant);

let orchestrator =
crate::garbage_collector_orchestrator_v2::GarbageCollectorOrchestrator::new(
collection.id,
collection.version_file_path,
collection.lineage_file_path,
version_absolute_cutoff_time,
collection_soft_delete_absolute_cutoff_time,
self.sysdb_client.clone(),
dispatcher.clone(),
system.clone(),
self.storage.clone(),
self.logs.clone(),
self.root_manager.clone(),
cleanup_mode,
self.config.min_versions_to_keep,
enable_log_gc,
enable_dangerous_option_to_ignore_min_versions_for_wal3,
);

let started_at = SystemTime::now();
let result = match orchestrator.run(system.clone()).await {
Ok(res) => res,
Err(e) => {
tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e);
return Err(GarbageCollectCollectionError::OrchestratorV2Error(e));
}
};
let duration_ms = started_at
.elapsed()
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
self.job_duration_ms_metric.record(duration_ms, &[]);
self.total_files_deleted_metric.add(
result.num_files_deleted as u64,
&[opentelemetry::KeyValue::new(
"cleanup_mode",
format!("{:?}", cleanup_mode),
)],
);
self.total_versions_deleted_metric.add(
result.num_versions_deleted as u64,
&[opentelemetry::KeyValue::new(
"cleanup_mode",
format!("{:?}", cleanup_mode),
)],
let orchestrator =
crate::garbage_collector_orchestrator_v2::GarbageCollectorOrchestrator::new(
collection.id,
collection.version_file_path,
collection.lineage_file_path,
version_absolute_cutoff_time,
collection_soft_delete_absolute_cutoff_time,
self.sysdb_client.clone(),
dispatcher.clone(),
system.clone(),
self.storage.clone(),
self.logs.clone(),
self.root_manager.clone(),
cleanup_mode,
self.config.min_versions_to_keep,
enable_log_gc,
enable_dangerous_option_to_ignore_min_versions_for_wal3,
);

return Ok(result);
}

let orchestrator = crate::garbage_collector_orchestrator::GarbageCollectorOrchestrator::new(
collection.id,
collection.version_file_path,
version_absolute_cutoff_time,
self.sysdb_client.clone(),
dispatcher.clone(),
self.storage.clone(),
cleanup_mode,
);

let started_at = SystemTime::now();
let result = orchestrator.run(system.clone()).await?;
let result = match orchestrator.run(system.clone()).await {
Ok(res) => res,
Err(e) => {
tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e);
return Err(GarbageCollectCollectionError::OrchestratorV2Error(e));
}
};
let duration_ms = started_at
.elapsed()
.map(|d| d.as_millis() as u64)
Expand All @@ -254,6 +216,7 @@ impl GarbageCollector {
format!("{:?}", cleanup_mode),
)],
);

Ok(result)
}

Expand Down Expand Up @@ -390,7 +353,6 @@ impl Handler<ManualGarbageCollectionRequest> for GarbageCollector {
struct GarbageCollectResult {
num_completed_jobs: u32,
num_failed_jobs: u32,
num_skipped_jobs: u32,
num_hard_deleted_databases: u32,
}

Expand Down Expand Up @@ -492,32 +454,23 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
}
}

let mut num_skipped_jobs = 0;
let collections_to_gc = collections_to_gc.into_iter().map(|collection| {
let cleanup_mode = if let Some(tenant_mode_overrides) = &self.config.tenant_mode_overrides {
tenant_mode_overrides
.get(&collection.tenant)
.cloned()
.unwrap_or(self.config.default_mode)
} else {
self.config.default_mode
};

(cleanup_mode.to_owned(), collection)
}).filter(|(cleanup_mode, collection)| {
if collection.lineage_file_path.is_some() && !cleanup_mode.is_v2() {
tracing::debug!(
"Skipping garbage collection for root of fork tree because GC v1 cannot handle fork trees: {}",
collection.id
);
num_skipped_jobs += 1;
return false;
}

true
})
.take(self.config.max_collections_to_gc as usize)
.collect::<Vec<_>>();
let collections_to_gc = collections_to_gc
.into_iter()
.map(|collection| {
let cleanup_mode =
if let Some(tenant_mode_overrides) = &self.config.tenant_mode_overrides {
tenant_mode_overrides
.get(&collection.tenant)
.cloned()
.unwrap_or(self.config.default_mode)
} else {
self.config.default_mode
};

(cleanup_mode.to_owned(), collection)
})
.take(self.config.max_collections_to_gc as usize)
.collect::<Vec<_>>();

tracing::info!(
"Filtered to {} collections to garbage collect",
Expand Down Expand Up @@ -640,7 +593,6 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
return GarbageCollectResult {
num_completed_jobs,
num_failed_jobs,
num_skipped_jobs,
num_hard_deleted_databases: num_hard_deleted_databases as u32,
};
}
Expand Down Expand Up @@ -867,134 +819,6 @@ mod tests {
(collection_id, database_name)
}

#[tokio::test]
#[traced_test]
async fn test_k8s_integration_ignores_forked_collections() {
let tenant_id = format!("tenant-{}", Uuid::new_v4());
let tenant_mode_overrides = HashMap::from([(tenant_id.clone(), CleanupMode::Delete)]);

let config = GarbageCollectorConfig {
service_name: "gc".to_string(),
otel_endpoint: "none".to_string(),
otel_filters: vec![OtelFilter {
crate_name: "garbage_collector".to_string(),
filter_level: OtelFilterLevel::Debug,
}],
version_cutoff_time: Duration::from_secs(1),
collection_soft_delete_grace_period: Duration::from_secs(1),
max_collections_to_gc: 100,
max_collections_to_fetch: None,
gc_interval_mins: 10,
disallow_collections: HashSet::new(),
min_versions_to_keep: 2,
filter_min_versions_if_alive: None,
sysdb_config: GrpcSysDbConfig {
host: "localhost".to_string(),
port: 50051,
connect_timeout_ms: 5000,
request_timeout_ms: 10000,
num_channels: 1,
},
dispatcher_config: DispatcherConfig::default(),
storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await,
default_mode: CleanupMode::DryRun,
tenant_mode_overrides: Some(tenant_mode_overrides),
assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(),
my_member_id: "test-gc".to_string(),
memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig::default(),
port: 50055,
root_cache_config: Default::default(),
jemalloc_pprof_server_port: None,
log: LogConfig::Grpc(GrpcLogConfig::default()),
enable_log_gc_for_tenant: Vec::new(),
enable_log_gc_for_tenant_threshold: "ffffffff-ffff-ffff-ffff-ffffffffffff".to_string(),
enable_dangerous_option_to_ignore_min_versions_for_wal3: false,
};
let registry = Registry::new();

// Create collection
let mut clients = ChromaGrpcClients::new().await.unwrap();

let (collection_id, _) = create_test_collection(tenant_id.clone(), &mut clients).await;
let mut sysdb = SysDb::Grpc(
GrpcSysDb::try_from_config(&config.sysdb_config, &registry)
.await
.unwrap(),
);
let collections = sysdb
.get_collections(GetCollectionsOptions {
collection_id: Some(collection_id),
..Default::default()
})
.await
.unwrap();
let collection = collections.first().unwrap();
// Fork collection
sysdb
.fork_collection(
collection_id,
collection.log_position as u64,
collection.log_position as u64,
CollectionUuid::new(),
"test-fork".to_string(),
)
.await
.unwrap();

// Wait 1 second for cutoff time
tokio::time::sleep(Duration::from_secs(1)).await;

// Run garbage collection
let system = System::new();
let mut garbage_collector_component =
GarbageCollector::try_from_config(&(config.clone(), system.clone()), &registry)
.await
.unwrap();

let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, &registry)
.await
.unwrap();

let dispatcher_handle = system.start_component(dispatcher);

garbage_collector_component.set_dispatcher(dispatcher_handle);
garbage_collector_component.set_system(system.clone());
let mut garbage_collector_handle = system.start_component(garbage_collector_component);

garbage_collector_handle
.send(
vec![Member {
member_id: "test-gc".to_string(),
member_ip: "0.0.0.0".to_string(),
member_node_name: "test-gc-node".to_string(),
}],
None,
)
.await
.unwrap();

let result = garbage_collector_handle
.request(
GarbageCollectMessage {
tenant: Some(tenant_id.clone()),
},
Some(Span::current()),
)
.await
.unwrap();

// Should have skipped
assert_eq!(
result,
GarbageCollectResult {
num_completed_jobs: 0,
num_failed_jobs: 0,
num_skipped_jobs: 1,
num_hard_deleted_databases: 0,
}
);
}

#[tokio::test]
#[traced_test]
async fn test_k8s_integration_tenant_mode_override() {
Expand All @@ -1003,7 +827,7 @@ mod tests {
let tenant_id_for_dry_run_mode = format!("tenant-dry-run-mode-{}", Uuid::new_v4());

let mut tenant_mode_overrides = HashMap::new();
tenant_mode_overrides.insert(tenant_id_for_delete_mode.clone(), CleanupMode::Delete);
tenant_mode_overrides.insert(tenant_id_for_delete_mode.clone(), CleanupMode::DeleteV2);

let config = GarbageCollectorConfig {
service_name: "gc".to_string(),
Expand All @@ -1029,7 +853,7 @@ mod tests {
},
dispatcher_config: DispatcherConfig::default(),
storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await,
default_mode: CleanupMode::DryRun,
default_mode: CleanupMode::DryRunV2,
tenant_mode_overrides: Some(tenant_mode_overrides),
assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(),
my_member_id: "test-gc".to_string(),
Expand Down Expand Up @@ -1140,10 +964,10 @@ mod tests {
.await
.unwrap();

// There should be 3 versions left in delete mode, since the version 1 should have been deleted.
// There should be 2 versions left in delete mode, since the versions 0 and 1 should have been deleted.
assert_eq!(
delete_mode_versions.versions.len(),
3,
2,
"Expected 3 versions in delete mode, found {}",
delete_mode_versions.versions.len()
);
Expand Down Expand Up @@ -1341,7 +1165,6 @@ mod tests {
GarbageCollectResult {
num_completed_jobs: 1,
num_failed_jobs: 0,
num_skipped_jobs: 0,
num_hard_deleted_databases: 0, // The database should not have been hard deleted yet
}
);
Expand All @@ -1364,7 +1187,6 @@ mod tests {
GarbageCollectResult {
num_completed_jobs: 1,
num_failed_jobs: 0,
num_skipped_jobs: 0,
num_hard_deleted_databases: 1, // The database should have been hard deleted
}
);
Expand Down
Loading
Loading