Skip to content

Conversation

@tanujnay112
Copy link
Contributor

@tanujnay112 tanujnay112 commented Oct 22, 2025

Description of changes

Summarize the changes made by this PR.

  • Improvements & Bug fixes
    • Create heaptender client for rust to use in the compaction code within FinishTask.
  • New functionality
    • ...

Test plan

How are these changes tested?

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?

Observability plan

What is the plan to instrument and monitor this change?

Documentation Changes

Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_

@github-actions
Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

@tanujnay112 tanujnay112 marked this pull request as ready for review October 22, 2025 08:30
@propel-code-bot
Copy link
Contributor

propel-code-bot bot commented Oct 22, 2025

Centralize compaction & FinishTask flows through new Heaptender gRPC client

This PR introduces a reusable Rust gRPC client (HeaptenderClient) for the Heaptender S3-heap management service and rewires every compaction-related code path—including the FinishTask operator—to route through this client. By eliminating all ad-hoc direct S3 heap manipulation in the worker, the change establishes a single, typed, observable integration point for heap compaction and clean-up. New configuration knobs and service-discovery wiring are added so both local development and production clusters can locate the Heaptender service. The result is simplified code, improved observability, and a clear migration path for future heap-level optimizations and policies.

Because workers now hard-depend on a reachable Heaptender endpoint, the service must be deployed and configured before these binaries are rolled out. No data-plane or schema migration is required.

Key Changes

• Created s3heap-service client module exposing HeaptenderClient with TLS, retry, and timeout support (tonic/tower based)
• Replaced direct S3 heap calls in compactor/compaction_manager.rs and execution/operators/finish_task.rs with HeaptenderClient::finish_and_compact
• Added heaptender.{host,port,timeout} configuration sections to worker, frontend, and client crates
• Extended memberlist client_manager to advertise the Heaptender service for discovery, and updated Tiltfile to spin it up in local dev
• Integrated gRPC traffic into existing grpc_log tracing so Heaptender calls are automatically logged/observed

Affected Areas

• rust/s3heap-service (new client crate)
• rust/worker/compactor and execution operators
• rust/frontend executor config
• memberlist service discovery layer
• Dev-ops manifests (Tiltfile) and logging middleware

This summary was automatically generated by @propel-code-bot

);

// TODO: Schedule a new task for next nonce by pushing to the heap
// Schedule a new task for next nonce by pushing to the heap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CriticalError]

Idempotency issue: The push operation to schedule the next task run lacks idempotency protection. If this function is called multiple times with the same task (e.g., due to retries or duplicate processing), it will schedule the same task multiple times in the heap, leading to duplicate task execution:

// Add idempotency check before scheduling
if self.is_task_already_scheduled(&input.updated_task.id, &input.updated_task.next_nonce).await? {
    tracing::info!("Task already scheduled, skipping heap push");
    return Ok(FinishTaskOutput::new());
}
Context for Agents
[**CriticalError**]

Idempotency issue: The `push` operation to schedule the next task run lacks idempotency protection. If this function is called multiple times with the same task (e.g., due to retries or duplicate processing), it will schedule the same task multiple times in the heap, leading to duplicate task execution:

```rust
// Add idempotency check before scheduling
if self.is_task_already_scheduled(&input.updated_task.id, &input.updated_task.next_nonce).await? {
    tracing::info!("Task already scheduled, skipping heap push");
    return Ok(FinishTaskOutput::new());
}
```

File: rust/worker/src/execution/operators/finish_task.rs
Line: 135

@tanujnay112 tanujnay112 changed the base branch from task-api-rust-operators to graphite-base/5715 October 22, 2025 09:06
@tanujnay112 tanujnay112 changed the base branch from graphite-base/5715 to task-api-integration October 22, 2025 09:06
Comment on lines 217 to 222
s3heap::client::GrpcHeapService::try_from_config(
&(config, system),
&registry,
)
.await
.expect("Failed to create test heap service client - ensure heap service is running on localhost:50052")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Test setup will panic if heap service is unavailable: The expect() will crash tests when the heap service isn't running on localhost:50052:

.expect("Failed to create test heap service client - ensure heap service is running on localhost:50052")

For tests, use a mock or make the test conditional:

let heap_service = s3heap::client::GrpcHeapService::try_from_config(
    &(config, system),
    &registry,
).await;

match heap_service {
    Ok(service) => service,
    Err(_) => {
        eprintln!("Skipping test - heap service not available");
        return;
    }
}
Context for Agents
[**BestPractice**]

Test setup will panic if heap service is unavailable: The `expect()` will crash tests when the heap service isn't running on localhost:50052:

```rust
.expect("Failed to create test heap service client - ensure heap service is running on localhost:50052")
```

For tests, use a mock or make the test conditional:

```rust
let heap_service = s3heap::client::GrpcHeapService::try_from_config(
    &(config, system),
    &registry,
).await;

match heap_service {
    Ok(service) => service,
    Err(_) => {
        eprintln!("Skipping test - heap service not available");
        return;
    }
}
```

File: rust/worker/src/execution/operators/finish_task.rs
Line: 222

Comment on lines +143 to +150
};

heap_service
.push(
vec![schedule],
&input.updated_task.input_collection_id.to_string(),
)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Missing error handling for heap service operations:

heap_service
    .push(
        vec![schedule],
        &input.updated_task.input_collection_id.to_string(),
    )
    .await?;

If the heap service is unavailable or returns an error, this will cause the entire task finish operation to fail, potentially leaving the task in an inconsistent state. Consider handling heap service errors gracefully:

if let Err(e) = heap_service.push(vec![schedule], key).await {
    tracing::warn!("Failed to schedule next task run: {}", e);
    // Continue with task completion even if scheduling fails
}
Context for Agents
[**BestPractice**]

Missing error handling for heap service operations:

```rust
heap_service
    .push(
        vec![schedule],
        &input.updated_task.input_collection_id.to_string(),
    )
    .await?;
```

If the heap service is unavailable or returns an error, this will cause the entire task finish operation to fail, potentially leaving the task in an inconsistent state. Consider handling heap service errors gracefully:

```rust
if let Err(e) = heap_service.push(vec![schedule], key).await {
    tracing::warn!("Failed to schedule next task run: {}", e);
    // Continue with task completion even if scheduling fails
}
```

File: rust/worker/src/execution/operators/finish_task.rs
Line: 150

@blacksmith-sh blacksmith-sh bot deleted a comment from tanujnay112 Oct 22, 2025
@tanujnay112 tanujnay112 requested a review from rescrv October 22, 2025 19:53
@tanujnay112 tanujnay112 changed the base branch from task-api-integration to graphite-base/5715 October 24, 2025 21:09
@tanujnay112 tanujnay112 changed the base branch from graphite-base/5715 to task-api-integration October 24, 2025 21:21
Comment on lines 645 to 647
let endpoint_res = match Endpoint::from_shared(format!(
"grpc://rust-log-service-{ordinal}.rust-log-service:50051"
"grpc://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the http:// scheme for plaintext connections. While grpc:// may work with some gRPC implementations, http:// is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, use https://.

Suggested change
let endpoint_res = match Endpoint::from_shared(format!(
"grpc://rust-log-service-{ordinal}.rust-log-service:50051"
"grpc://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {
let endpoint_res = match Endpoint::from_shared(format!(
"http://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

For consistency with Tonic best practices and other parts of the codebase that create gRPC endpoints, use the `http://` scheme for plaintext connections. While `grpc://` may work with some gRPC implementations, `http://` is the standard scheme for plaintext gRPC connections in Tonic, as documented in official examples and used throughout the Tonic ecosystem. For TLS connections, use `https://`.

```suggestion
        let endpoint_res = match Endpoint::from_shared(format!(
            "http://rust-log-service-{ordinal}.rust-log-service:{port}"
        )) {
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/log/src/grpc_log.rs
Line: 647

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, propel.

@tanujnay112 tanujnay112 force-pushed the heap_tender_client branch 3 times, most recently from 7e3a65c to d2c1a19 Compare October 24, 2025 23:31
@tanujnay112 tanujnay112 requested a review from rescrv October 24, 2025 23:32
Comment on lines 645 to 647
let endpoint_res = match Endpoint::from_shared(format!(
"grpc://rust-log-service-{ordinal}.rust-log-service:50051"
"grpc://rust-log-service-{ordinal}.rust-log-service:{port}"
)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, propel.

1,
1000,
1000,
50051,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it.

}

/// Push schedules to the heap
#[tracing::instrument(skip(self, schedules))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our pattern in most places is to have the client do conversion to local types. These request and respond with chroma_proto types. Which is fine if we're not going to parse or validate them.

Comment on lines +2073 to +2095
let heap_service = GrpcHeapService::try_from_config(
&(GrpcHeapServiceConfig::default(), system.clone()),
&registry,
)
.await
.expect("Should connect to grpc heap service");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_k8s_integration_ is a symbol that it's to be run with these services running.

@tanujnay112 tanujnay112 changed the base branch from task-api-integration to graphite-base/5715 October 27, 2025 02:07
@tanujnay112 tanujnay112 changed the base branch from graphite-base/5715 to task-api-integration October 27, 2025 02:45
@tanujnay112 tanujnay112 changed the base branch from task-api-integration to graphite-base/5715 October 27, 2025 18:27
@graphite-app graphite-app bot changed the base branch from graphite-base/5715 to main October 27, 2025 18:27
Comment on lines +200 to +205
return Ok(GrpcHeapService::new(
my_config.clone(),
client_assigner,
client_manager_handle,
memberlist_provider_handle,
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The return keyword is redundant here as this is the last expression in the function. It can be removed for idiomatic Rust.

Context for Agents
[**BestPractice**]

The `return` keyword is redundant here as this is the last expression in the function. It can be removed for idiomatic Rust.

File: rust/s3heap-service/src/client/grpc.rs
Line: 205

@tanujnay112 tanujnay112 merged commit b253069 into main Oct 27, 2025
176 of 182 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants