-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: limit concurrency on operators spawned by GC #5720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
418aef2 to
4380f03
Compare
|
Introduce Concurrency Limit for GC ListFiles Operations to Reduce Memory Usage This PR introduces a configurable concurrency limit for the Key Changes• Added config option Affected Areas• This summary was automatically generated by @propel-code-bot |
| #[serde( | ||
| default = "GarbageCollectorConfig::default_max_concurrent_list_files_operations_per_collection" | ||
| )] | ||
| pub(super) max_concurrent_list_files_operations_per_collection: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The new configuration field max_concurrent_list_files_operations_per_collection has pub(super) visibility, which is inconsistent with other pub fields like min_versions_to_keep. This prevents garbage_collector_tool.rs from accessing this value from the loaded config, forcing it to use a hardcoded value.
Please consider making this field pub for consistency. This will also allow the tool to use the configured value.
| pub(super) max_concurrent_list_files_operations_per_collection: usize, | |
| pub max_concurrent_list_files_operations_per_collection: usize, |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
The new configuration field `max_concurrent_list_files_operations_per_collection` has `pub(super)` visibility, which is inconsistent with other `pub` fields like `min_versions_to_keep`. This prevents `garbage_collector_tool.rs` from accessing this value from the loaded config, forcing it to use a hardcoded value.
Please consider making this field `pub` for consistency. This will also allow the tool to use the configured value.
```suggestion
pub max_concurrent_list_files_operations_per_collection: usize,
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/garbage_collector/src/config.rs
Line: 40| config.min_versions_to_keep, | ||
| enable_log_gc, | ||
| enable_dangerous_option_to_ignore_min_versions_for_wal3, | ||
| 10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
This hardcoded value could be replaced by the value from the configuration. I've left a suggestion on rust/garbage_collector/src/config.rs to make the max_concurrent_list_files_operations_per_collection field public. Once that's done, you can use config.max_concurrent_list_files_operations_per_collection here.
| 10, | |
| config.max_concurrent_list_files_operations_per_collection, |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
This hardcoded value could be replaced by the value from the configuration. I've left a suggestion on `rust/garbage_collector/src/config.rs` to make the `max_concurrent_list_files_operations_per_collection` field public. Once that's done, you can use `config.max_concurrent_list_files_operations_per_collection` here.
```suggestion
config.max_concurrent_list_files_operations_per_collection,
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/garbage_collector/src/bin/garbage_collector_tool.rs
Line: 2484380f03 to
a9a8189
Compare
| None => return, | ||
| }; | ||
| let res = self.handle_list_files_at_version_output(output, ctx).await; | ||
| tracing::info!("res: {:?}", res); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Debug logging statement left in production code. This appears to be temporary debugging code that should be removed before merging.
| tracing::info!("res: {:?}", res); | |
| self.ok_or_terminate(res, ctx).await; |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
Debug logging statement left in production code. This appears to be temporary debugging code that should be removed before merging.
```suggestion
self.ok_or_terminate(res, ctx).await;
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs
Line: 10428646eac to
4545ab3
Compare
a9a8189 to
468e9cc
Compare
468e9cc to
d63ad47
Compare
| let mut stream = futures::stream::iter(output.versions.into_iter().flat_map( | ||
| |(collection_id, versions)| { | ||
| versions | ||
| .keys() | ||
| .map(|version| (collection_id, *version)) | ||
| .collect::<Vec<_>>() | ||
| }, | ||
| )) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
For a minor performance improvement, you can avoid collecting into a Vec inside the flat_map. The flat_map can directly work with the iterator returned by map, which avoids an intermediate allocation for each collection's versions.
Context for Agents
[**PerformanceOptimization**]
For a minor performance improvement, you can avoid collecting into a `Vec` inside the `flat_map`. The `flat_map` can directly work with the iterator returned by `map`, which avoids an intermediate allocation for each collection's versions.
File: rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs
Line: 409| sender | ||
| .send(message) | ||
| .map_err(|_| ChannelError::SendError("Failed to send message".to_string()))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The error from oneshot::Sender::send occurs when the receiver has been dropped, indicating the receiver is no longer interested in the response. The current error message is generic. A more specific message would help debugging. The send method returns Result<(), T> where the Err(T) contains the original message on failure. You can ignore this with let _ = tx.send(value); or explicitly acknowledge it with if let Err(_message) = tx.send(value) { /* handle receiver dropped */ } to make the intention clear.
Context for Agents
[**BestPractice**]
The error from `oneshot::Sender::send` occurs when the receiver has been dropped, indicating the receiver is no longer interested in the response. The current error message is generic. A more specific message would help debugging. The `send` method returns `Result<(), T>` where the `Err(T)` contains the original message on failure. You can ignore this with `let _ = tx.send(value);` or explicitly acknowledge it with `if let Err(_message) = tx.send(value) { /* handle receiver dropped */ }` to make the intention clear.
File: rust/system/src/receiver.rs
Line: 95| @@ -1,163 +0,0 @@ | |||
| use async_trait::async_trait; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as this just unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, inlined into rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Description of changes
When running the garbage collector service, we see it sometimes OOM. This is because
ListFilesAtVersionsOperatorcan allocate a lot of memory and currently has unbounded concurrency.This PR adds a new config parameter
max_concurrent_list_files_operations_per_collectionto limit the concurrency of this operator. Doing this required a bit of restructuring in the GC orchestrator.Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs section?