Skip to content

Conversation

@codetheweb
Copy link
Contributor

@codetheweb codetheweb commented Oct 22, 2025

Description of changes

When running the garbage collector service, we see it sometimes OOM. This is because ListFilesAtVersionsOperator can allocate a lot of memory and currently has unbounded concurrency.

This PR adds a new config parameter max_concurrent_list_files_operations_per_collection to limit the concurrency of this operator. Doing this required a bit of restructuring in the GC orchestrator.

Test plan

How are these changes tested?

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?

Observability plan

What is the plan to instrument and monitor this change?

Documentation Changes

Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs section?

Copy link
Contributor Author

codetheweb commented Oct 22, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

@github-actions
Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

@codetheweb codetheweb force-pushed the feat-gc-limit-operation-concurrency branch 4 times, most recently from 418aef2 to 4380f03 Compare October 22, 2025 19:57
@codetheweb codetheweb marked this pull request as ready for review October 22, 2025 20:02
@propel-code-bot
Copy link
Contributor

propel-code-bot bot commented Oct 22, 2025

Introduce Concurrency Limit for GC ListFiles Operations to Reduce Memory Usage

This PR introduces a configurable concurrency limit for the ListFilesAtVersionsOperator invoked by the garbage collector (GC) service. A new config parameter max_concurrent_list_files_operations_per_collection is added to restrict the number of concurrent list files operations per collection, targeting observed out-of-memory (OOM) issues in GC runs due to previously unbounded concurrency. Implementation of the concurrency control required notable refactoring of the GC orchestrator logic, particularly consolidating and simplifying the stage that handles marking versions at sysdb and launching list files operators. The code also removes the now-redundant mark_versions_at_sysdb step and its operator, and reorganizes related logic to leverage direct sysdb client calls.

Key Changes

• Added config option max_concurrent_list_files_operations_per_collection to the GC config; default set to 10.
• Refactored GarbageCollectorOrchestrator to coordinate marking versions as deleted in sysdb and then spawning list files tasks with bounded concurrency using futures::stream::buffer_unordered.
• Removed the separate mark_versions_at_sysdb operator, collapsing its logic directly into the orchestrator.
• Updated affected orchestrator handler signatures and streamlined task dispatch for list files and subsequent file deletion steps.
• Modified function/thread signatures and test utility invocations to account for the new concurrency parameter.
• Introduced OneshotMessageReceiver for improved async message handling in Rust's actor-like system.
• Updated multiple sites across CLI tooling, tests, and configuration parsing to pass and utilize the new concurrency limit.

Affected Areas

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs - major refactor and logic update
rust/garbage_collector/src/config.rs - config struct and defaults update
rust/garbage_collector/src/bin/garbage_collector_tool.rs - CLI updated to accept/use the new param
rust/system/src/receiver.rs - new message receiver infrastructure for oneshot channels
rust/garbage_collector/src/operators/mark_versions_at_sysdb.rs - deleted
• Test helpers and orchestrator construction in various test modules

This summary was automatically generated by @propel-code-bot

#[serde(
default = "GarbageCollectorConfig::default_max_concurrent_list_files_operations_per_collection"
)]
pub(super) max_concurrent_list_files_operations_per_collection: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The new configuration field max_concurrent_list_files_operations_per_collection has pub(super) visibility, which is inconsistent with other pub fields like min_versions_to_keep. This prevents garbage_collector_tool.rs from accessing this value from the loaded config, forcing it to use a hardcoded value.

Please consider making this field pub for consistency. This will also allow the tool to use the configured value.

Suggested change
pub(super) max_concurrent_list_files_operations_per_collection: usize,
pub max_concurrent_list_files_operations_per_collection: usize,

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

The new configuration field `max_concurrent_list_files_operations_per_collection` has `pub(super)` visibility, which is inconsistent with other `pub` fields like `min_versions_to_keep`. This prevents `garbage_collector_tool.rs` from accessing this value from the loaded config, forcing it to use a hardcoded value.

Please consider making this field `pub` for consistency. This will also allow the tool to use the configured value.

```suggestion
    pub max_concurrent_list_files_operations_per_collection: usize,
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/garbage_collector/src/config.rs
Line: 40

config.min_versions_to_keep,
enable_log_gc,
enable_dangerous_option_to_ignore_min_versions_for_wal3,
10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

This hardcoded value could be replaced by the value from the configuration. I've left a suggestion on rust/garbage_collector/src/config.rs to make the max_concurrent_list_files_operations_per_collection field public. Once that's done, you can use config.max_concurrent_list_files_operations_per_collection here.

Suggested change
10,
config.max_concurrent_list_files_operations_per_collection,

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

This hardcoded value could be replaced by the value from the configuration. I've left a suggestion on `rust/garbage_collector/src/config.rs` to make the `max_concurrent_list_files_operations_per_collection` field public. Once that's done, you can use `config.max_concurrent_list_files_operations_per_collection` here.

```suggestion
                config.max_concurrent_list_files_operations_per_collection,
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/garbage_collector/src/bin/garbage_collector_tool.rs
Line: 248

@codetheweb codetheweb force-pushed the feat-gc-limit-operation-concurrency branch from 4380f03 to a9a8189 Compare October 22, 2025 21:17
None => return,
};
let res = self.handle_list_files_at_version_output(output, ctx).await;
tracing::info!("res: {:?}", res);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Debug logging statement left in production code. This appears to be temporary debugging code that should be removed before merging.

Suggested change
tracing::info!("res: {:?}", res);
self.ok_or_terminate(res, ctx).await;

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

Debug logging statement left in production code. This appears to be temporary debugging code that should be removed before merging.

```suggestion
        self.ok_or_terminate(res, ctx).await;
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs
Line: 1042

@codetheweb codetheweb changed the base branch from cln-remove-gc-v1 to graphite-base/5720 October 22, 2025 22:38
@codetheweb codetheweb force-pushed the feat-gc-limit-operation-concurrency branch from a9a8189 to 468e9cc Compare October 22, 2025 22:38
@graphite-app graphite-app bot changed the base branch from graphite-base/5720 to main October 22, 2025 22:38
@codetheweb codetheweb force-pushed the feat-gc-limit-operation-concurrency branch from 468e9cc to d63ad47 Compare October 22, 2025 22:38
Comment on lines +402 to +409
let mut stream = futures::stream::iter(output.versions.into_iter().flat_map(
|(collection_id, versions)| {
versions
.keys()
.map(|version| (collection_id, *version))
.collect::<Vec<_>>()
},
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[PerformanceOptimization]

For a minor performance improvement, you can avoid collecting into a Vec inside the flat_map. The flat_map can directly work with the iterator returned by map, which avoids an intermediate allocation for each collection's versions.

Context for Agents
[**PerformanceOptimization**]

For a minor performance improvement, you can avoid collecting into a `Vec` inside the `flat_map`. The `flat_map` can directly work with the iterator returned by `map`, which avoids an intermediate allocation for each collection's versions.

File: rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs
Line: 409

Comment on lines +93 to +95
sender
.send(message)
.map_err(|_| ChannelError::SendError("Failed to send message".to_string()))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The error from oneshot::Sender::send occurs when the receiver has been dropped, indicating the receiver is no longer interested in the response. The current error message is generic. A more specific message would help debugging. The send method returns Result<(), T> where the Err(T) contains the original message on failure. You can ignore this with let _ = tx.send(value); or explicitly acknowledge it with if let Err(_message) = tx.send(value) { /* handle receiver dropped */ } to make the intention clear.

Context for Agents
[**BestPractice**]

The error from `oneshot::Sender::send` occurs when the receiver has been dropped, indicating the receiver is no longer interested in the response. The current error message is generic. A more specific message would help debugging. The `send` method returns `Result<(), T>` where the `Err(T)` contains the original message on failure. You can ignore this with `let _ = tx.send(value);` or explicitly acknowledge it with `if let Err(_message) = tx.send(value) { /* handle receiver dropped */ }` to make the intention clear.

File: rust/system/src/receiver.rs
Line: 95

@@ -1,163 +0,0 @@
use async_trait::async_trait;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this just unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, inlined into rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

@codetheweb codetheweb merged commit 0f1292e into main Oct 28, 2025
62 checks passed
@codetheweb codetheweb deleted the feat-gc-limit-operation-concurrency branch October 28, 2025 16:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants