Skip to content

Conversation

@wirybeaver
Copy link
Contributor

@wirybeaver wirybeaver commented Nov 19, 2025

This the first part of a series of changes aimed at enabling real-time table replication between two Pinot clusters.

The core purpose of this specific PR is to introduce the necessary functionality for creating a new real-time table with consuming segment watermarks inducted from source table's ZK metadata and a designated broker / server tenant when performing a table copy operation.

The key changes introduced by the 3 commits are:

  • Copy table with designated tenant and watermarks: Modifies the table copy mechanism to include the specification of a designated tenant and watermarks.
  • Create first consuming segments per partition: Introduces logic to create the initial consuming segments for each partition based on the provided watermarks.
  • Induct the watermark of consuming status: Updates the consuming status and watermarks during the table copy process: Reuse the exsiting pinotHelixResourceManager api which will fetch the currently consuming status per partition. Basically, that api can dump the table's IdealState and output the last segment sequence number per partition by parsing the segment name.
    • If the segment status is IN_PROGRESS, we consider this segment is the CONSUMING segment, thus we copy the startOffset and sequence number as the start consuming position of that partition for new table.
    • If the segment status is DONE, the consuming segment is the one next to it, thus we will use the endOffset and 1 + sequence_number to initialize the consuming segment for new table.

Doesn't support following feature at the moment:

  • upsert / offline table replication.
  • shallow copy of deep segment.
  • auto pause the table if the consuming segment's start offset expired in kafka broker.
  • auto re-upload segments if no deep url in the segment metadata.
  • pauseless table.

Test
We have a manual Test in Uber's pinot cluster. Confirm that new consuming segments of the target cluster copy the sequence number (rather than 0) and startOffset per partition when the latest segments in the source table are consuming segments. And the consuming status is Active.

Design Draft

@codecov-commenter
Copy link

codecov-commenter commented Nov 19, 2025

Codecov Report

❌ Patch coverage is 37.64045% with 111 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.24%. Comparing base (82b7d2a) to head (6aa1cc0).
⚠️ Report is 18 commits behind head on master.

Files with missing lines Patch % Lines
...oller/api/resources/PinotTableRestletResource.java 19.51% 63 Missing and 3 partials ⚠️
...ot/controller/api/resources/CopyTableResponse.java 0.00% 22 Missing ⚠️
...ntroller/helix/core/PinotHelixResourceManager.java 42.30% 13 Missing and 2 partials ⚠️
...ller/api/resources/PinotRealtimeTableResource.java 0.00% 6 Missing ⚠️
...not/controller/api/resources/CopyTablePayload.java 83.33% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17235      +/-   ##
============================================
- Coverage     63.25%   63.24%   -0.01%     
- Complexity     1475     1476       +1     
============================================
  Files          3162     3167       +5     
  Lines        188668   188992     +324     
  Branches      28869    28916      +47     
============================================
+ Hits         119348   119537     +189     
- Misses        60074    60180     +106     
- Partials       9246     9275      +29     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.21% <37.64%> (-0.01%) ⬇️
java-21 63.20% <37.64%> (-0.03%) ⬇️
temurin 63.24% <37.64%> (-0.01%) ⬇️
unittests 63.24% <37.64%> (-0.01%) ⬇️
unittests1 55.57% <100.00%> (-0.02%) ⬇️
unittests2 34.04% <37.07%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from 77c1c11 to c831445 Compare December 2, 2025 03:59
@wirybeaver wirybeaver requested a review from deemoliu December 5, 2025 03:24
@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from 0c4ad75 to 68a373b Compare December 5, 2025 03:33
@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from 1893728 to 33e7d47 Compare December 12, 2025 05:59
@wirybeaver wirybeaver requested a review from deemoliu December 13, 2025 02:00
@wirybeaver
Copy link
Contributor Author

@Jackie-Jiang @chenboat Could you take a review? The manual integration test gets done already. The purpose of this PR is to copy the schema and table config from the source cluster to the target cluster by replacing the server / broke tenant. And then copy the consuming segment to kick off the message consumption on the new table. The backfill of segments prior to consuming segments will be implemented in a follow up PR

@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from 371998c to 6dfbe51 Compare December 14, 2025 03:03
@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from 758c88b to c27583c Compare December 18, 2025 23:02
@wirybeaver wirybeaver requested a review from deemoliu December 19, 2025 00:07
@wirybeaver
Copy link
Contributor Author

wirybeaver commented Dec 19, 2025

The unit test Set-1 fail in java 11 but succeed in java 21. The error is about "selectionCombineOperator Not found issue", which is nothing related to table copy. I just do a rebase without any conflict resolving. Previous tests were all successful.

@wirybeaver
Copy link
Contributor Author

Walk through the design doc with @abhishekbafna on 12/23 and aligned on the direction of the solution.

@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from ccb638a to 4cf3fae Compare January 6, 2026 04:29
@wirybeaver
Copy link
Contributor Author

@abhishekbafna I add the dryRun as a boolean flag in the payload. The reply contains schema, modified table config and watermarkResult as well. The write operation (schema and table config addition) is moved to the end.

@wirybeaver
Copy link
Contributor Author

MSE against 1.3 failed but succeed against 1.4 and master

Copy link
Collaborator

@abhishekbafna abhishekbafna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/{tableName}/consumerWatermarks")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_IDEAL_STATE)
@ApiOperation(value = "Get table ideal state", notes = "Get table ideal state")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it about "table ideal state"? Is it typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

}

/**
* Performs validations of table config and adds the table to zookeeper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any restriction on this API? Realtime table only? Can it add upsert tables? If so, please clarify in the javadoc.

* The {@code PartitionGroupInfo} class represents the metadata and sequence number for a partition group.
* It encapsulates the {@link PartitionGroupMetadata} and the sequence number associated with it.
*/
public class PartitionGroupInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need any wrapper class like this? Can we reuse PartitionGroupMetadata or add to it instead of creating a new class?

Copy link
Contributor Author

@wirybeaver wirybeaver Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a pure Kafka consumer point of view, the segment sequence number doesn't seem to belong to a PartitionGroupMeta? I
I am also concerned that the change of the constructor and adding the sequence number might cause review rejections. Many places already use the class PartitonGroupMeta. Adding a wrapper class would bring benefit of less existing code changes.

- Updated PinotRealtimeTableResource.java: Changed misleading JavaDoc from
  "Get table ideal state" to accurately describe the consumer watermarks endpoint
- Updated PinotHelixResourceManager.java: Added explicit documentation clarifying
  that the API is restricted to realtime tables only and works with both upsert
  and non-upsert tables
- Updated PinotHelixResourceManager.addTable() to accept List<Pair<PartitionGroupMetadata, Integer>>
- Updated PinotLLCRealtimeSegmentManager.setUpNewTable() to use Pair instead of PartitionGroupInfo
- Refactored setupNewPartitionGroup() to accept metadata and sequence separately
- Updated PinotTableRestletResource copyTable endpoint to create Pair instances
- Updated unit tests in PinotHelixResourceManagerStatelessTest and PartitionGroupInfoTest
- Added necessary imports for PartitionGroupMetadata and Pair
- Deleted PartitionGroupInfo.java as it has been replaced with Pair<PartitionGroupMetadata, Integer>
- Deleted PartitionGroupInfoTest.java as the class is no longer needed
- Verified no other classes reference PartitionGroupInfo
@wirybeaver wirybeaver force-pushed the oss/tableCopyConsuming branch from 9f6c222 to 6aa1cc0 Compare January 9, 2026 07:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants