-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Add heaptender client to compaction and make finishtask use this #5715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
|
Centralize compaction & FinishTask flows through new Heaptender gRPC client This PR introduces a reusable Rust gRPC client (HeaptenderClient) for the Heaptender S3-heap management service and rewires every compaction-related code path—including the FinishTask operator—to route through this client. By eliminating all ad-hoc direct S3 heap manipulation in the worker, the change establishes a single, typed, observable integration point for heap compaction and clean-up. New configuration knobs and service-discovery wiring are added so both local development and production clusters can locate the Heaptender service. The result is simplified code, improved observability, and a clear migration path for future heap-level optimizations and policies. Because workers now hard-depend on a reachable Heaptender endpoint, the service must be deployed and configured before these binaries are rolled out. No data-plane or schema migration is required. Key Changes• Created s3heap-service client module exposing HeaptenderClient with TLS, retry, and timeout support (tonic/tower based) Affected Areas• rust/s3heap-service (new client crate) This summary was automatically generated by @propel-code-bot |
| ); | ||
|
|
||
| // TODO: Schedule a new task for next nonce by pushing to the heap | ||
| // Schedule a new task for next nonce by pushing to the heap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Idempotency issue: The push operation to schedule the next task run lacks idempotency protection. If this function is called multiple times with the same task (e.g., due to retries or duplicate processing), it will schedule the same task multiple times in the heap, leading to duplicate task execution:
// Add idempotency check before scheduling
if self.is_task_already_scheduled(&input.updated_task.id, &input.updated_task.next_nonce).await? {
tracing::info!("Task already scheduled, skipping heap push");
return Ok(FinishTaskOutput::new());
}Context for Agents
[**CriticalError**]
Idempotency issue: The `push` operation to schedule the next task run lacks idempotency protection. If this function is called multiple times with the same task (e.g., due to retries or duplicate processing), it will schedule the same task multiple times in the heap, leading to duplicate task execution:
```rust
// Add idempotency check before scheduling
if self.is_task_already_scheduled(&input.updated_task.id, &input.updated_task.next_nonce).await? {
tracing::info!("Task already scheduled, skipping heap push");
return Ok(FinishTaskOutput::new());
}
```
File: rust/worker/src/execution/operators/finish_task.rs
Line: 135e69ab46 to
34d705e
Compare
d525f8f to
473df8e
Compare
34d705e to
9306828
Compare
473df8e to
7f68839
Compare
9306828 to
5d96010
Compare
| s3heap::client::GrpcHeapService::try_from_config( | ||
| &(config, system), | ||
| ®istry, | ||
| ) | ||
| .await | ||
| .expect("Failed to create test heap service client - ensure heap service is running on localhost:50052") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Test setup will panic if heap service is unavailable: The expect() will crash tests when the heap service isn't running on localhost:50052:
.expect("Failed to create test heap service client - ensure heap service is running on localhost:50052")For tests, use a mock or make the test conditional:
let heap_service = s3heap::client::GrpcHeapService::try_from_config(
&(config, system),
®istry,
).await;
match heap_service {
Ok(service) => service,
Err(_) => {
eprintln!("Skipping test - heap service not available");
return;
}
}Context for Agents
[**BestPractice**]
Test setup will panic if heap service is unavailable: The `expect()` will crash tests when the heap service isn't running on localhost:50052:
```rust
.expect("Failed to create test heap service client - ensure heap service is running on localhost:50052")
```
For tests, use a mock or make the test conditional:
```rust
let heap_service = s3heap::client::GrpcHeapService::try_from_config(
&(config, system),
®istry,
).await;
match heap_service {
Ok(service) => service,
Err(_) => {
eprintln!("Skipping test - heap service not available");
return;
}
}
```
File: rust/worker/src/execution/operators/finish_task.rs
Line: 2227f68839 to
3c540ae
Compare
5d96010 to
91a10e9
Compare
| }; | ||
|
|
||
| heap_service | ||
| .push( | ||
| vec![schedule], | ||
| &input.updated_task.input_collection_id.to_string(), | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Missing error handling for heap service operations:
heap_service
.push(
vec![schedule],
&input.updated_task.input_collection_id.to_string(),
)
.await?;If the heap service is unavailable or returns an error, this will cause the entire task finish operation to fail, potentially leaving the task in an inconsistent state. Consider handling heap service errors gracefully:
if let Err(e) = heap_service.push(vec![schedule], key).await {
tracing::warn!("Failed to schedule next task run: {}", e);
// Continue with task completion even if scheduling fails
}Context for Agents
[**BestPractice**]
Missing error handling for heap service operations:
```rust
heap_service
.push(
vec![schedule],
&input.updated_task.input_collection_id.to_string(),
)
.await?;
```
If the heap service is unavailable or returns an error, this will cause the entire task finish operation to fail, potentially leaving the task in an inconsistent state. Consider handling heap service errors gracefully:
```rust
if let Err(e) = heap_service.push(vec![schedule], key).await {
tracing::warn!("Failed to schedule next task run: {}", e);
// Continue with task completion even if scheduling fails
}
```
File: rust/worker/src/execution/operators/finish_task.rs
Line: 15071d0776 to
db61dab
Compare
bcaa45f to
c9cd620
Compare
db61dab to
1426e76
Compare
| let endpoint_res = match Endpoint::from_shared(format!( | ||
| "grpc://rust-log-service-{ordinal}.rust-log-service:50051" | ||
| "grpc://rust-log-service-{ordinal}.rust-log-service:{port}" | ||
| )) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the http:// scheme for plaintext connections. While grpc:// may work with some gRPC implementations, http:// is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, use https://.
| let endpoint_res = match Endpoint::from_shared(format!( | |
| "grpc://rust-log-service-{ordinal}.rust-log-service:50051" | |
| "grpc://rust-log-service-{ordinal}.rust-log-service:{port}" | |
| )) { | |
| let endpoint_res = match Endpoint::from_shared(format!( | |
| "http://rust-log-service-{ordinal}.rust-log-service:{port}" | |
| )) { |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the `http://` scheme for plaintext connections. While `grpc://` may work with some gRPC implementations, `http://` is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, use `https://`.
```suggestion
let endpoint_res = match Endpoint::from_shared(format!(
"http://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/log/src/grpc_log.rs
Line: 647There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, propel.
1426e76 to
98006bf
Compare
7e3a65c to
d2c1a19
Compare
| let endpoint_res = match Endpoint::from_shared(format!( | ||
| "grpc://rust-log-service-{ordinal}.rust-log-service:50051" | ||
| "grpc://rust-log-service-{ordinal}.rust-log-service:{port}" | ||
| )) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, propel.
| 1, | ||
| 1000, | ||
| 1000, | ||
| 50051, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with it.
| } | ||
|
|
||
| /// Push schedules to the heap | ||
| #[tracing::instrument(skip(self, schedules))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our pattern in most places is to have the client do conversion to local types. These request and respond with chroma_proto types. Which is fine if we're not going to parse or validate them.
| let heap_service = GrpcHeapService::try_from_config( | ||
| &(GrpcHeapServiceConfig::default(), system.clone()), | ||
| ®istry, | ||
| ) | ||
| .await | ||
| .expect("Should connect to grpc heap service"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_k8s_integration_ is a symbol that it's to be run with these services running.
d2c1a19 to
62d07a3
Compare
c9cd620 to
1abff18
Compare
62d07a3 to
50ebd25
Compare
50ebd25 to
f7fac1e
Compare
1abff18 to
0ac2957
Compare
f7fac1e to
1d9082e
Compare
| return Ok(GrpcHeapService::new( | ||
| my_config.clone(), | ||
| client_assigner, | ||
| client_manager_handle, | ||
| memberlist_provider_handle, | ||
| )); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The return keyword is redundant here as this is the last expression in the function. It can be removed for idiomatic Rust.
Context for Agents
[**BestPractice**]
The `return` keyword is redundant here as this is the last expression in the function. It can be removed for idiomatic Rust.
File: rust/s3heap-service/src/client/grpc.rs
Line: 205
Description of changes
Summarize the changes made by this PR.
Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_