Skip to content

Conversation

@tanujnay112
Copy link
Contributor

@tanujnay112 tanujnay112 commented Dec 9, 2025

Description of changes

Summarize the changes made by this PR.

  • Improvements & Bug fixes
    • The compactor used to have a DLQ per node before this change. This creates a new compaction_failure_count column on the collections table to implement a global DLQ.
    • In memory failing_jobs map has been removed.
    • get_dead_jobs() endpoint has been removed as this information should be derived from the SysDB now.
  • New functionality
    • It is expected that restarting compactor nodes will not reset DLQ state now.

Test plan

How are these changes tested?

test_k8s_integration_scheduler already tests this feature.

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?

The new column in collectionsis defaulted to 0 in the database, Go model and proto definitions.

Observability plan

What is the plan to instrument and monitor this change?

The DLQ is more easily observable from the SysDB now.

compactor_job_failure_count replaces the compactor_dead_jobs_count metric and we increment that on every failure instead of every time a job is "killed".

Documentation Changes

Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

Copy link
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

@tanujnay112 tanujnay112 marked this pull request as ready for review December 9, 2025 23:39
@propel-code-bot
Copy link
Contributor

propel-code-bot bot commented Dec 9, 2025

Persist dead-letter state for compaction jobs in SysDB

Introduces a global DLQ by persisting compaction failure counts per collection in SysDB and wiring the compactor to use that state. The change removes in-memory failure tracking, adds a compaction_failure_count column to both Postgres and SQLite schemas, exposes an increment RPC, and resets counters on successful flushes so nodes share DLQ state across restarts.

Key Changes

• Replace the scheduler’s in-memory failing_jobs/dead_jobs logic with SysDB-backed counts and convert the job tracking helpers (fail_job, is_job_in_progress, schedule_internal) to async to await SysDB updates.
• Add IncrementCompactionFailureCount RPC, DAO support, and migrations; the DAO resets counts on success and uses gRPC/sql to bump counts, while protobuf/go/rust bindings now include the new column and RPC.
• Extend compaction tests to cover the max failure skip path, update metrics (compactor_job_failure_count counter), and drop the legacy get_dead_jobs endpoint.

Affected Areas

rust/worker/src/compactor scheduler & manager
rust/sysdb client APIs and SQLite implementation
go/pkg/sysdb coordinator/DAO and migrations
• Protobuf definitions & generated bindings

This summary was automatically generated by @propel-code-bot

@blacksmith-sh

This comment has been minimized.

@blacksmith-sh

This comment has been minimized.

lineage_file_path: None,
updated_at: SystemTime::UNIX_EPOCH,
database_id,
compaction_failure_count: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

[Logic] The compaction_failure_count is hardcoded to 0 here. It should be fetched from the database row to correctly reflect the persisted failure count for the collection. This will cause the DLQ logic to not work as intended for the SQLite backend, as it will always report 0 failures.

To fix this, you'll need to:

  1. Update the sea_query builder in get_collections_with_conn to select the compaction_failure_count column.
  2. Parse the value from the SqliteRow and use it when constructing the Collection struct.

This will likely also require changes in chroma-sqlite/src/table.rs to add CompactionFailureCount to the Collections enum, and an update to the SQLite schema creation logic to include the new column.

Example of what needs to change in get_collections_with_conn:

// In the query builder
// ...
.column((table::Collections::Table, table::Collections::SchemaStr))
.column((table::Collections::Table, table::Collections::CompactionFailureCount)) // Add this
// ...

// In the row processing loop
// ...
let compaction_failure_count: i32 = first_row.get("compaction_failure_count"); // Get the value
// ...
Some(Ok(Collection {
    // ...
    compaction_failure_count, // Use the fetched value
}))
// ...
Context for Agents
The `compaction_failure_count` is hardcoded to 0 here. It should be fetched from the database row to correctly reflect the persisted failure count for the collection. This will cause the DLQ logic to not work as intended for the SQLite backend, as it will always report 0 failures.

To fix this, you'll need to:
1.  Update the `sea_query` builder in `get_collections_with_conn` to select the `compaction_failure_count` column.
2.  Parse the value from the `SqliteRow` and use it when constructing the `Collection` struct.

This will likely also require changes in `chroma-sqlite/src/table.rs` to add `CompactionFailureCount` to the `Collections` enum, and an update to the SQLite schema creation logic to include the new column.

Example of what needs to change in `get_collections_with_conn`:

```rust
// In the query builder
// ...
.column((table::Collections::Table, table::Collections::SchemaStr))
.column((table::Collections::Table, table::Collections::CompactionFailureCount)) // Add this
// ...

// In the row processing loop
// ...
let compaction_failure_count: i32 = first_row.get("compaction_failure_count"); // Get the value
// ...
Some(Ok(Collection {
    // ...
    compaction_failure_count, // Use the fetched value
}))
// ...
```

File: rust/sysdb/src/sqlite.rs
Line: 864

@tanujnay112 tanujnay112 force-pushed the dlq_sysdb branch 2 times, most recently from 45971d3 to cbc5830 Compare December 15, 2025 18:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants