Skip to content

Conversation

@hongyunyan
Copy link
Collaborator

@hongyunyan hongyunyan commented Jan 5, 2026

What problem does this PR solve?

Issue Number: ref #3848

What is changed and how it works?

This pull request establishes the foundational components for implementing checksum validation for heartbeats within the system. By defining new protobuf structures and introducing a dedicated nodeSetChecksumManager, it enables the maintainer to track and verify the consistency of dispatcher sets across different nodes. This mechanism is crucial for proactively identifying and addressing discrepancies, thereby enhancing the overall data integrity and operational stability of the distributed environment.

Highlights

  • New Protobuf Definitions: Introduced new Protobuf definitions for ChecksumState enum and messages like ChecksumMeta, DispatcherSetChecksum, DispatcherSetChecksumAckResponse, and DispatcherSetChecksumUpdateRequest to support checksum validation.
  • Checksum Manager Implementation: Implemented nodeSetChecksumManager to manage expected dispatcher set checksums on the maintainer side, including logic for applying incremental changes, flushing updates, handling acknowledgments, and resending messages.
  • Messaging System Integration: Integrated the new checksum-related message types into the internal messaging system by adding corresponding IOType constants and handling their encoding/decoding.
  • Typo Correction: Corrected a widespread typo from DefaultKeyspaceNamme to DefaultKeyspaceName across various files, improving code consistency.
  • Unit Tests: Added comprehensive unit tests for the nodeSetChecksumManager to ensure the reliability of checksum tracking and communication logic.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

@ti-chi-bot ti-chi-bot bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Jan 5, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the foundational groundwork for robust checksum validation within the system. It defines the necessary data structures and communication protocols for tracking and verifying dispatcher set checksums across different nodes. The changes enable the maintainer to manage the expected state of dispatcher sets and react to discrepancies reported via heartbeats, which is crucial for ensuring data consistency and operational stability in a distributed environment.

Highlights

  • New Protobuf Definitions: Introduced new protobuf messages and an enum for ChecksumState, ChecksumMeta, DispatcherSetChecksum, DispatcherSetChecksumAckResponse, and DispatcherSetChecksumUpdateRequest to facilitate checksum validation.
  • Checksum Manager Implementation: Added nodeSetChecksumManager to manage expected dispatcher set checksums on the maintainer side, including logic for applying deltas, flushing dirty states, handling acknowledgments, and resending updates.
  • Messaging System Integration: Integrated the new checksum update and acknowledgment messages into the internal messaging system by adding new IOType constants.
  • Unit Tests: Included unit tests for the nodeSetChecksumManager to ensure correct functionality of checksum tracking, resending, acknowledgment, and node removal.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the foundational components for checksum validation in heartbeats, including new protobuf definitions and a nodeSetChecksumManager to manage checksum state on the maintainer side. The code is well-structured and provides a solid base for the feature. I have identified a few areas for improvement, such as potentially dead code, a redundant check, and a minor style issue in the protobuf definition. Additionally, a typo was found in a constant used within the new test files.

int64 mode = 3;
uint64 seq = 4;
DispatcherSetChecksum checksum = 5;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The file should end with a newline character. It's a common convention and some tools might expect it.

Comment on lines 186 to 206
if exists {
if oldCapture == capture {
log.Warn("dispatcher already exists in expected set, ignore it",
zap.Stringer("changefeedID", m.changefeedID),
zap.String("dispatcherID", id.String()),
zap.String("capture", capture.String()),
zap.String("mode", common.StringMode(m.mode)),
)
continue
}
log.Warn("dispatcher exists in another capture, override expected node",
zap.Stringer("changefeedID", m.changefeedID),
zap.String("dispatcherID", id.String()),
zap.String("oldCapture", oldCapture.String()),
zap.String("newCapture", capture.String()),
zap.String("mode", common.StringMode(m.mode)),
)
if oldState, ok := m.state.captures[oldCapture]; ok {
oldState.checksum.Remove(id)
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This if exists block appears to be dead code. applyExpectedSet is only called from ResetAndSendFull, which resets m.state just before this call. Consequently, m.state.dispatcherToNode will be empty, and exists will always evaluate to false.

If this logic is intended for future use, please add a comment to clarify. Otherwise, it should be removed to prevent confusion.

if exists && oldCapture == capture {
continue
}
if exists && oldCapture != "" && oldCapture != capture {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check oldCapture != "" seems redundant. If exists is true, oldCapture should already be a valid, non-empty capture ID. This check can be removed to simplify the code.

Suggested change
if exists && oldCapture != "" && oldCapture != capture {
if exists && oldCapture != capture {


func TestDispatcherSetChecksumResendAndAck(t *testing.T) {
mgr := newNodeSetChecksumManager(
common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There appears to be a typo in the constant common.DefaultKeyspaceNamme. It should likely be common.DefaultKeyspaceName. Please consider correcting the constant definition in pkg/common/types.go.


func TestCaptureSetChecksumManagerRemoveNodesCleansState(t *testing.T) {
mgr := newNodeSetChecksumManager(
common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There appears to be a typo in the constant common.DefaultKeyspaceNamme. It should likely be common.DefaultKeyspaceName. Please consider correcting the constant definition in pkg/common/types.go.

@hongyunyan
Copy link
Collaborator Author

/gemini review

@hongyunyan
Copy link
Collaborator Author

/gemini summary

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a checksum validation mechanism for heartbeats to enhance data integrity, alongside fixing a widespread typo (DefaultKeyspaceNamme to DefaultKeyspaceName). The core of the new functionality lies in the new node_set_checksum_manager.go, which manages checksums for node sets, and the corresponding protobuf definitions. My review identifies a critical issue in how protobuf messages are compared, which could lead to incorrect behavior, and also points out some minor inconsistencies and typos for improvement.

Comment on lines 303 to 305
if ack.ChangefeedID != m.changefeedID.ToPB() {
return
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The comparison ack.ChangefeedID != m.changefeedID.ToPB() is incorrect because it compares the memory addresses of the two pointers, not their contents. This will likely always evaluate to true, causing valid acknowledgements to be ignored. You should use proto.Equal to perform a deep comparison of protobuf messages.

You'll also need to add "github.com/gogo/protobuf/proto" to your imports for this change.

Suggested change
if ack.ChangefeedID != m.changefeedID.ToPB() {
return
}
if !proto.Equal(ack.ChangefeedID, m.changefeedID.ToPB()) {
return
}

@gemini-code-assist
Copy link

Summary of Changes

This pull request establishes the foundational components for implementing checksum validation for heartbeats within the system. By defining new protobuf structures and introducing a dedicated nodeSetChecksumManager, it enables the maintainer to track and verify the consistency of dispatcher sets across different nodes. This mechanism is crucial for proactively identifying and addressing discrepancies, thereby enhancing the overall data integrity and operational stability of the distributed environment.

Highlights

  • New Protobuf Definitions: Introduced new Protobuf definitions for ChecksumState enum and messages like ChecksumMeta, DispatcherSetChecksum, DispatcherSetChecksumAckResponse, and DispatcherSetChecksumUpdateRequest to support checksum validation.
  • Checksum Manager Implementation: Implemented nodeSetChecksumManager to manage expected dispatcher set checksums on the maintainer side, including logic for applying incremental changes, flushing updates, handling acknowledgments, and resending messages.
  • Messaging System Integration: Integrated the new checksum-related message types into the internal messaging system by adding corresponding IOType constants and handling their encoding/decoding.
  • Typo Correction: Corrected a widespread typo from DefaultKeyspaceNamme to DefaultKeyspaceName across various files, improving code consistency.
  • Unit Tests: Added comprehensive unit tests for the nodeSetChecksumManager to ensure the reliability of checksum tracking and communication logic.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • api/v2/changefeed.go
    • Corrected a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • cmd/kafka-consumer/writer.go
    • Fixed a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • cmd/pulsar-consumer/writer.go
    • Corrected a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • cmd/storage-consumer/consumer.go
    • Fixed a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • downstreamadapter/sink/redo/meta.go
    • Corrected a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • heartbeatpb/heartbeat.pb.go
    • Generated Go code for new protobuf definitions, including ChecksumState enum, ChecksumMeta, DispatcherSetChecksum, DispatcherSetChecksumAckResponse, and DispatcherSetChecksumUpdateRequest messages, along with their serialization methods.
  • heartbeatpb/heartbeat.proto
    • Added new protobuf definitions for ChecksumState enum, ChecksumMeta (for checksum state and sequence), DispatcherSetChecksum (for count, xor, and sum of dispatcher IDs), DispatcherSetChecksumAckResponse, and DispatcherSetChecksumUpdateRequest.
  • maintainer/node_set_checksum_manager.go
    • Introduced nodeSetChecksumManager to manage expected dispatcher set checksums per node, handling state tracking, delta application, dirty state flushing, acknowledgment processing, and resending unacknowledged updates.
  • maintainer/node_set_checksum_manager_test.go
    • Added unit tests for nodeSetChecksumManager covering scenarios like resending unacknowledged updates and proper state cleanup upon node removal.
  • maintainer/operator/operator_controller.go
    • Corrected typos in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName in metrics labels.
  • pkg/common/types.go
    • Corrected the constant name DefaultKeyspaceNamme to DefaultKeyspaceName and updated its usage.
  • pkg/etcd/etcd.go
    • Fixed a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • pkg/keyspace/keyspace_manager.go
    • Corrected a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • pkg/messaging/message.go
    • Added new IOType constants (TypeDispatcherSetChecksumUpdateRequest, TypeDispatcherSetChecksumAckResponse) and updated message decoding/creation logic to support the new checksum messages.
  • pkg/migrate/migrate.go
    • Fixed typos in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • pkg/redo/common/util.go
    • Corrected a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • pkg/redo/writer/file/file.go
    • Fixed a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • pkg/redo/writer/memory/file_worker.go
    • Corrected a typo in common.DefaultKeyspaceNamme to common.DefaultKeyspaceName.
  • pkg/set_checksum/set_checksum.go
    • New file defining a Checksum struct for order-independent, incrementally updatable checksums using count, XOR, and sum of 128-bit dispatcher ID components, along with methods for adding, removing, comparing, and converting to/from protobuf.
Activity
  • A bot indicated that the PR is not yet approved and listed required approvals.
  • Gemini Code Assist provided an initial summary of changes.
  • The author requested a review and summary from Gemini Code Assist.
  • A bot reported several failed tests (pull-check, pull-unit-test-next-gen, pull-unit-test).
  • Gemini Code Assist provided several review comments, including suggestions for adding a newline character, clarifying potential dead code, removing redundant checks, correcting multiple instances of a typo (common.DefaultKeyspaceNamme), addressing a critical issue with incorrect protobuf message comparison (!= instead of proto.Equal), and aligning a comment with its message name in heartbeatpb/heartbeat.proto.

hongyunyan and others added 5 commits January 6, 2026 11:51
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@hongyunyan
Copy link
Collaborator Author

/test all

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 6, 2026

@hongyunyan: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-mysql-integration-heavy d9f0c37 link true /test pull-cdc-mysql-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

) (shouldWarn bool, duration time.Duration) {
if state == heartbeatpb.ChecksumState_MATCH {
s.lastObservedState = state
s.nonMatchSince = time.Time{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is time.Time{} a standard way to initialize the time.Time ?

How about use the time.Now() at the very first, and then set it to all related fields.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and it's more like a reset action

}

needResetTimer := s.lastObservedState == heartbeatpb.ChecksumState_MATCH || s.lastObservedState != state
if needResetTimer || s.nonMatchSince.IsZero() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|| s.nonMatchSince.IsZero() can be combined into the condition check above.

}
for _, id := range ids {
oldNode, exists := m.state.dispatcherToNode[id]
if exists {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !exists just continue first.

@ti-chi-bot ti-chi-bot bot added the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label Jan 6, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 6, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-01-06 06:29:57.029036411 +0000 UTC m=+683752.847344843: ☑️ agreed by 3AceShowHand.

@ti-chi-bot ti-chi-bot bot added the approved label Jan 6, 2026
return
}

stateStr := "mismatch"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use a constant here.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 7, 2026

@3AceShowHand: Your lgtm message is repeated, so it is ignored.

Details

In response to this:

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 7, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: 3AceShowHand

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

ChangefeedID changefeedID = 1;
uint64 epoch = 2;
int64 mode = 3;
uint64 seq = 4;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need epochs and seq? Can't a single seq be sufficient?

m.mu.Lock()
nodeState, ok := m.state.nodes[from]
if !ok {
m.mu.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use defer m.mu.Unlock() above

changefeed: m.changefeedID,
node: from,
}
m.mu.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use defer m.mu.Unlock() above

return msgs
}

func (m *nodeSetChecksumManager) applyExpectedSet(expected map[node.ID][]common.DispatcherID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to inform that a lock is required before use.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved needs-1-more-lgtm Indicates a PR needs 1 more LGTM. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants