diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 36a2d0a1ac586..eeb8ce3002e95 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -2317,7 +2317,7 @@ public void testAcquireWithMaxInFlightRecordsAndReleaseLastOffset() { // Release middle batch. CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(15, 19, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(15, 19, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); // Validate the nextFetchOffset is updated to 15. @@ -2342,7 +2342,7 @@ public void testAcquireWithMaxInFlightRecordsAndReleaseLastOffset() { // Release last offset of the acquired batch. Only 1 record should be released and later acquired. ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(29, 29, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(29, 29, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); // Validate the nextFetchOffset is updated to 29. @@ -2390,7 +2390,7 @@ public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords // Release middle batch. CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); // Validate the nextFetchOffset is updated to 5. @@ -2442,7 +2442,7 @@ public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords // Release only 1 middle batch. CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); // Validate the nextFetchOffset is updated to 5. @@ -2637,7 +2637,7 @@ public void testAcknowledgeSingleRecordBatch() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(1, 1, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(1, 1, List.of(AcknowledgeType.ACCEPT.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -2666,7 +2666,7 @@ public void testAcknowledgeMultipleRecordBatch() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.ACCEPT.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -2701,11 +2701,11 @@ public void testAcknowledgeMultipleRecordBatchWithGapOffsets() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 6, List.of((byte) 2)), + new ShareAcknowledgementBatch(5, 6, List.of(AcknowledgeType.RELEASE.id)), new ShareAcknowledgementBatch(10, 18, List.of( - (byte) 2, (byte) 2, (byte) 2, - (byte) 2, (byte) 2, (byte) 0, - (byte) 0, (byte) 0, (byte) 1 + AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id, + AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id, ACKNOWLEDGE_TYPE_GAP_ID, + ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id )))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -2763,11 +2763,11 @@ public void testAcknowledgeMultipleSubsetRecordBatchWithGapOffsets() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, List.of(new ShareAcknowledgementBatch(6, 18, List.of( - (byte) 1, (byte) 1, (byte) 1, - (byte) 1, (byte) 1, (byte) 1, - (byte) 0, (byte) 0, (byte) 1, - (byte) 0, (byte) 1, (byte) 0, - (byte) 1)))); + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -2806,7 +2806,7 @@ public void testAcknowledgeOutOfRangeCachedData() { // Acknowledge a batch when cache is empty. CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(0, 15, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(0, 15, List.of(AcknowledgeType.REJECT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); @@ -2820,7 +2820,7 @@ public void testAcknowledgeOutOfRangeCachedData() { ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(20, 25, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(20, 25, List.of(AcknowledgeType.REJECT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRequestException.class, ackResult); assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -2844,8 +2844,8 @@ public void testAcknowledgeOutOfRangeCachedDataFirstBatch() { // Acknowledge a batch when first batch violates the range. List acknowledgeBatches = List.of( - new ShareAcknowledgementBatch(0, 10, List.of((byte) 1)), - new ShareAcknowledgementBatch(20, 24, List.of((byte) 1))); + new ShareAcknowledgementBatch(0, 10, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.ACCEPT.id))); CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, acknowledgeBatches); assertTrue(ackResult.isCompletedExceptionally()); @@ -2881,7 +2881,7 @@ public void testAcknowledgeWithAnotherMember() { CompletableFuture ackResult = sharePartition.acknowledge( "member-2", - List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.REJECT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); } @@ -2899,7 +2899,7 @@ public void testAcknowledgeWhenOffsetNotAcquired() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -2909,7 +2909,7 @@ public void testAcknowledgeWhenOffsetNotAcquired() { // Acknowledge the same batch again but with ACCEPT type. ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); @@ -2921,7 +2921,7 @@ public void testAcknowledgeWhenOffsetNotAcquired() { ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(6, 8, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(6, 8, List.of(AcknowledgeType.REJECT.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); assertEquals(3, sharePartition.inFlightTerminalRecords()); @@ -2929,7 +2929,7 @@ public void testAcknowledgeWhenOffsetNotAcquired() { // Re-acknowledge the subset batch with REJECT type. ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(6, 8, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(6, 8, List.of(AcknowledgeType.REJECT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); assertEquals(3, sharePartition.inFlightTerminalRecords()); @@ -2958,11 +2958,11 @@ public void testAcknowledgeRollbackWithFullBatchError() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)), - new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)), - new ShareAcknowledgementBatch(15, 19, List.of((byte) 1)), + new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(15, 19, List.of(AcknowledgeType.ACCEPT.id)), // Add another batch which should fail the request. - new ShareAcknowledgementBatch(15, 19, List.of((byte) 1)))); + new ShareAcknowledgementBatch(15, 19, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); @@ -2997,11 +2997,11 @@ public void testAcknowledgeRollbackWithSubsetError() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)), - new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)), - new ShareAcknowledgementBatch(15, 19, List.of((byte) 1)), + new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(15, 19, List.of(AcknowledgeType.ACCEPT.id)), // Add another batch which should fail the request. - new ShareAcknowledgementBatch(16, 19, List.of((byte) 1)))); + new ShareAcknowledgementBatch(16, 19, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); @@ -3027,7 +3027,7 @@ public void testAcquireReleasedRecord() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(12, 13, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(12, 13, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -3095,7 +3095,7 @@ public void testAcquireReleasedRecordMultipleBatches() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(12, 30, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(12, 30, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -4067,7 +4067,7 @@ public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws Interru assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of((byte) 2)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RELEASE.id)))); assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(0, sharePartition.timer().size()); @@ -4101,7 +4101,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws Inter assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(5, sharePartition.nextFetchOffset()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState()); @@ -4154,7 +4154,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets( sharePartition.acknowledge(MEMBER_ID, // Do not send gap offsets to verify that they are ignored and accepted as per client ack. - List.of(new ShareAcknowledgementBatch(5, 18, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 18, List.of(AcknowledgeType.ACCEPT.id)))); assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); @@ -4282,11 +4282,11 @@ public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOf // Acknowledging over subset of both batch with subset of gap offsets. sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch( 6, 18, List.of( - (byte) 1, (byte) 1, (byte) 1, - (byte) 1, (byte) 1, (byte) 1, - (byte) 0, (byte) 0, (byte) 1, - (byte) 0, (byte) 1, (byte) 0, - (byte) 1)))); + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id)))); assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); @@ -4551,7 +4551,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep // Acknowledge with ACCEPT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout. CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); @@ -4560,7 +4560,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep // Try acknowledging with REJECT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout. ackResult = sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.REJECT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); @@ -4580,8 +4580,8 @@ public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedEx assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); - // Acknowledge with REJECT type. - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 6, List.of((byte) 2)))); + // Acknowledge with RELEASE type. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 6, List.of(AcknowledgeType.RELEASE.id)))); assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); @@ -4592,7 +4592,7 @@ public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedEx assertEquals(0, sharePartition.inFlightTerminalRecords()); // Acknowledge with ACCEPT type. - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte) 1)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id)))); assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); @@ -4747,7 +4747,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnOffsetLastDe assertEquals(1, sharePartition.timer().size()); assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte) 1)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -4835,7 +4835,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws assertEquals(1, sharePartition.timer().size()); assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte) 1)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id)))); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( @@ -4919,7 +4919,7 @@ public void testReleaseMultipleAcknowledgedRecordBatch() { fetchAcquiredRecords(sharePartition, records1, 2); fetchAcquiredRecords(sharePartition, records2, 9); - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 18, List.of((byte) 1)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 18, List.of(AcknowledgeType.ACCEPT.id)))); // After the acknowledgements, the cached state has 11 Terminal records -> // (5 -> 6) // (10 -> 18) @@ -4957,11 +4957,11 @@ public void testReleaseAcknowledgedMultipleSubsetRecordBatch() { // Acknowledging over subset of both batch with subset of gap offsets. sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(6, 18, List.of( - (byte) 1, (byte) 1, (byte) 1, - (byte) 1, (byte) 1, (byte) 1, - (byte) 0, (byte) 0, (byte) 1, - (byte) 0, (byte) 1, (byte) 0, - (byte) 1)))); + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id)))); // After the acknowledgements, the cached state has 10 Terminal records -> // 6 @@ -5014,9 +5014,9 @@ public void testReleaseAcquiredRecordsWithAnotherMember() { // Acknowledging over subset of second batch with subset of gap offsets. sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(10, 18, List.of( - (byte) 1, (byte) 1, (byte) 0, (byte) 0, - (byte) 1, (byte) 0, (byte) 1, (byte) 0, - (byte) 1)))); + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id)))); // After the acknowledgements, the cached state has 9 Terminal records -> // (10 -> 18) @@ -5090,9 +5090,9 @@ public void testReleaseAcquiredRecordsWithAnotherMemberAndSubsetAcknowledged() { // Acknowledging over subset of second batch with subset of gap offsets. sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(10, 18, List.of( - (byte) 1, (byte) 1, (byte) 0, (byte) 0, - (byte) 1, (byte) 0, (byte) 1, (byte) 0, - (byte) 1)))); + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id)))); // After the acknowledgements, the cached state has 9 Terminal records -> // (10 -> 18) @@ -5124,7 +5124,7 @@ public void testReleaseAcquiredRecordsWithAnotherMemberAndSubsetAcknowledged() { // Ack subset of records by "member-2". sharePartition.acknowledge("member-2", - List.of(new ShareAcknowledgementBatch(5, 5, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 5, List.of(AcknowledgeType.ACCEPT.id)))); // After the acknowledgements, the startOffset will be upadated to 6, since offset 5 is Terminal. Hence // inFlightTerminalRecords will remain 9. assertEquals(9, sharePartition.inFlightTerminalRecords()); @@ -5173,10 +5173,10 @@ public void testReleaseAcquiredRecordsAfterDifferentAcknowledges() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 6, List.of((byte) 2)))); + List.of(new ShareAcknowledgementBatch(5, 6, List.of(AcknowledgeType.RELEASE.id)))); sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -5208,7 +5208,7 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu fetchAcquiredRecords(sharePartition, records2, 5); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(10, 14, List.of((byte) 2)))); + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(0, sharePartition.inFlightTerminalRecords()); fetchAcquiredRecords(sharePartition, records2, 5); @@ -5242,9 +5242,9 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu fetchAcquiredRecords(sharePartition, records3, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( - new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)), - new ShareAcknowledgementBatch(17, 19, List.of((byte) 3)), - new ShareAcknowledgementBatch(20, 24, List.of((byte) 2)) + new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(17, 19, List.of(AcknowledgeType.REJECT.id)), + new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)) ))); assertEquals(3, sharePartition.inFlightTerminalRecords()); @@ -5306,10 +5306,10 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() { fetchAcquiredRecords(sharePartition, records3, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( - new ShareAcknowledgementBatch(10, 12, List.of((byte) 1)), - new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)), - new ShareAcknowledgementBatch(17, 19, List.of((byte) 3)), - new ShareAcknowledgementBatch(20, 24, List.of((byte) 2)) + new ShareAcknowledgementBatch(10, 12, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(17, 19, List.of(AcknowledgeType.REJECT.id)), + new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)) ))); // After acknowledgements, since offsets 10 -> 12 are at the start of the caches state and are in Terminal state, @@ -5323,8 +5323,8 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() { fetchAcquiredRecords(sharePartition, records3, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( - new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)), - new ShareAcknowledgementBatch(20, 24, List.of((byte) 2)) + new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)) ))); assertEquals(25, sharePartition.nextFetchOffset()); @@ -5339,7 +5339,7 @@ public void testReleaseAcquiredRecordsSubsetWithAnotherMember() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 7); sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 7, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 7, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -5408,7 +5408,7 @@ public void testReleaseOffsetWithWriteShareGroupStateFailure() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6); sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -5483,11 +5483,11 @@ public void testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchW // Acknowledging over subset of both batch with subset of gap offsets. sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(6, 18, List.of( - (byte) 1, (byte) 1, (byte) 1, - (byte) 1, (byte) 1, (byte) 1, - (byte) 0, (byte) 0, (byte) 1, - (byte) 0, (byte) 1, (byte) 0, - (byte) 1)))); + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, + ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID, + AcknowledgeType.ACCEPT.id)))); assertEquals(10, sharePartition.inFlightTerminalRecords()); @@ -5659,10 +5659,10 @@ public void testLsoMovementForArchivingBatches() { fetchAcquiredRecords(sharePartition, memoryRecords(32, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 6, List.of((byte) 1)), - new ShareAcknowledgementBatch(12, 16, List.of((byte) 2)), - new ShareAcknowledgementBatch(22, 26, List.of((byte) 2)), - new ShareAcknowledgementBatch(27, 31, List.of((byte) 3)) + new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(12, 16, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(22, 26, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(27, 31, List.of(AcknowledgeType.REJECT.id)) )); // After the acknowledgements, the records in Terminal state are -> @@ -5722,10 +5722,10 @@ public void testLsoMovementPostArchivedBatches() { sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 6, List.of((byte) 2)), - new ShareAcknowledgementBatch(12, 16, List.of((byte) 3)), - new ShareAcknowledgementBatch(22, 26, List.of((byte) 2)), - new ShareAcknowledgementBatch(27, 31, List.of((byte) 3)) + new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(12, 16, List.of(AcknowledgeType.REJECT.id)), + new ShareAcknowledgementBatch(22, 26, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(27, 31, List.of(AcknowledgeType.REJECT.id)) )); // After the acknowledgements, the records in Terminal state are -> @@ -5788,11 +5788,11 @@ public void testLsoMovementPostArchivedRecords() { sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 6, List.of((byte) 2)), - new ShareAcknowledgementBatch(12, 16, List.of((byte) 3)), - new ShareAcknowledgementBatch(19, 21, List.of((byte) 3)), - new ShareAcknowledgementBatch(22, 26, List.of((byte) 2)), - new ShareAcknowledgementBatch(27, 31, List.of((byte) 3)) + new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(12, 16, List.of(AcknowledgeType.REJECT.id)), + new ShareAcknowledgementBatch(19, 21, List.of(AcknowledgeType.REJECT.id)), + new ShareAcknowledgementBatch(22, 26, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(27, 31, List.of(AcknowledgeType.REJECT.id)) )); // After the acknowledgements, the records in Terminal state are -> @@ -5860,8 +5860,8 @@ public void testLsoMovementForArchivingAllAvailableBatches() { // 3. 31 -> 40: AVAILABLE // 4. 41 -> 50: ACQUIRED sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), - new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)) + new ShareAcknowledgementBatch(11, 20, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(31, 40, List.of(AcknowledgeType.RELEASE.id)) )); assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -5890,7 +5890,7 @@ public void testLsoMovementForArchivingAllAvailableBatches() { // The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these // records will remain in the ACQUIRED state. - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of(AcknowledgeType.RELEASE.id)))); // The acknowledgements make no difference to in flight records. assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -5921,8 +5921,8 @@ public void testLsoMovementForArchivingAllAvailableOffsets() { // 3. 31 -> 40: AVAILABLE // 4. 41 -> 50: ACQUIRED sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), - new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)) + new ShareAcknowledgementBatch(11, 20, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(31, 40, List.of(AcknowledgeType.RELEASE.id)) )); assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -5960,7 +5960,7 @@ public void testLsoMovementForArchivingAllAvailableOffsets() { // The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these // records will remain in the ACQUIRED state. - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(0, sharePartition.inFlightTerminalRecords()); // The batch is still in ACQUIRED state. @@ -5982,7 +5982,7 @@ public void testLsoMovementForArchivingOffsets() { fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(4, 8, List.of((byte) 1)))); + new ShareAcknowledgementBatch(4, 8, List.of(AcknowledgeType.ACCEPT.id)))); // LSO at is 5. sharePartition.updateCacheAndOffsets(5); @@ -6117,7 +6117,7 @@ public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostA // Acknowledge with ACCEPT action. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(7, 8, List.of((byte) 1)))); + new ShareAcknowledgementBatch(7, 8, List.of(AcknowledgeType.ACCEPT.id)))); // LSO is at 7. sharePartition.updateCacheAndOffsets(7); @@ -6163,7 +6163,7 @@ public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostR // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(7, 8, List.of((byte) 2)))); + new ShareAcknowledgementBatch(7, 8, List.of(AcknowledgeType.RELEASE.id)))); // LSO is at 7. sharePartition.updateCacheAndOffsets(7); @@ -6195,7 +6195,7 @@ public void testLsoMovementToEndOffset() { // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(7, 8, List.of((byte) 2)))); + new ShareAcknowledgementBatch(7, 8, List.of(AcknowledgeType.RELEASE.id)))); // LSO is at 11. sharePartition.updateCacheAndOffsets(11); @@ -6227,8 +6227,8 @@ public void testLsoMovementToEndOffsetWhereEndOffsetIsAvailable() { // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(7, 8, List.of((byte) 2)), - new ShareAcknowledgementBatch(11, 11, List.of((byte) 2)))); + new ShareAcknowledgementBatch(7, 8, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(11, 11, List.of(AcknowledgeType.RELEASE.id)))); // LSO is at 11. sharePartition.updateCacheAndOffsets(11); @@ -6260,7 +6260,7 @@ public void testLsoMovementAheadOfEndOffsetPostAcknowledgement() { // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(7, 8, List.of((byte) 2)))); + new ShareAcknowledgementBatch(7, 8, List.of(AcknowledgeType.RELEASE.id)))); // LSO is at 12. sharePartition.updateCacheAndOffsets(12); @@ -6350,7 +6350,7 @@ public void testLsoMovementWithGapsInCachedStateMapAndAcknowledgedBatch() { // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(10, 14, List.of((byte) 2)))); + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.RELEASE.id)))); // LSO is at 10. sharePartition.updateCacheAndOffsets(10); @@ -6382,9 +6382,11 @@ public void testLsoMovementPostGapsInAcknowledgements() { fetchAcquiredRecords(sharePartition, records2, 9); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 6, List.of((byte) 2)), + new ShareAcknowledgementBatch(5, 6, List.of(AcknowledgeType.RELEASE.id)), new ShareAcknowledgementBatch(10, 18, List.of( - (byte) 2, (byte) 2, (byte) 2, (byte) 2, (byte) 2, (byte) 0, (byte) 0, (byte) 0, (byte) 2 + AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id, + AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id, ACKNOWLEDGE_TYPE_GAP_ID, + ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.RELEASE.id )))); // LSO is at 18. @@ -6459,7 +6461,7 @@ public void testTerminalRecordsNotUpdatedWhenBatchesBeforeStartOffsetAreExpired( // Acknowledge the acquired records. Since these records are present before the startOffset, these acknowledgements // will simply be ignored. - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(11L, 20L, List.of((byte) 2)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(11L, 20L, List.of(AcknowledgeType.RELEASE.id)))); // Since the acknowledgements are ignored, the inFlightTerminalRecords should not change. assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -6598,7 +6600,7 @@ public void testTerminalRecordsNotUpdatedWhenOffsetsBeforeStartOffsetAreExpiredA Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); // Acknowledge the acquired records. Only those records that are after the startOffset will be acknowledged. // In this case, records 11 -> 15 will remain in the ACQUIRED state, while records 16 -> 20 will be RELEASED. - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(11L, 20L, List.of((byte) 2)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(11L, 20L, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(0, sharePartition.inFlightTerminalRecords()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(11L).offsetState().get(11L).state()); @@ -6655,10 +6657,10 @@ public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovement() { // Acknowledge records. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(6, 7, List.of((byte) 1)), - new ShareAcknowledgementBatch(8, 8, List.of((byte) 2)), - new ShareAcknowledgementBatch(25, 29, List.of((byte) 2)), - new ShareAcknowledgementBatch(35, 37, List.of((byte) 2)) + new ShareAcknowledgementBatch(6, 7, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(8, 8, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(25, 29, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(35, 37, List.of(AcknowledgeType.RELEASE.id)) )); assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -6795,7 +6797,7 @@ public void testReleaseAcquiredRecordsDecreaseDeliveryCount() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); - sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(12, 13, List.of((byte) 1)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(12, 13, List.of(AcknowledgeType.ACCEPT.id)))); // Records 12 and 13 are ACKNOWLEDGED. assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -6858,10 +6860,10 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws // Acknowledge records. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(6, 7, List.of((byte) 1)), - new ShareAcknowledgementBatch(8, 8, List.of((byte) 2)), - new ShareAcknowledgementBatch(25, 29, List.of((byte) 2)), - new ShareAcknowledgementBatch(35, 37, List.of((byte) 2)) + new ShareAcknowledgementBatch(6, 7, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(8, 8, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(25, 29, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(35, 37, List.of(AcknowledgeType.RELEASE.id)) )); assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -7061,8 +7063,8 @@ public void testAcknowledgeBatchAndOffsetPostLsoMovement() { // Acknowledge with RELEASE action. CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 6, List.of((byte) 2)), - new ShareAcknowledgementBatch(10, 14, List.of((byte) 2)))); + new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.RELEASE.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); @@ -7124,7 +7126,7 @@ public void testAcknowledgeBatchPostLsoMovement() { // Acknowledge with ACCEPT action. CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 14, List.of((byte) 1)))); + new ShareAcknowledgementBatch(2, 14, List.of(AcknowledgeType.ACCEPT.id)))); assertNull(ackResult.join()); assertFalse(ackResult.isCompletedExceptionally()); // Only record 14 is post startOffset and in a Terminal state. Thus, only that is considered for inFlightTerminalRecords. @@ -7195,7 +7197,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In // Acknowledge with RELEASE action. This contains a batch that doesn't exist at all. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 14, List.of((byte) 2)))); + new ShareAcknowledgementBatch(2, 14, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(10, sharePartition.nextFetchOffset()); assertEquals(10, sharePartition.startOffset()); @@ -7251,7 +7253,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff // Acknowledge with RELEASE action. This contains a batch that doesn't exist at all. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(1, 7, List.of((byte) 2)))); + new ShareAcknowledgementBatch(1, 7, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(3, sharePartition.nextFetchOffset()); assertEquals(3, sharePartition.startOffset()); @@ -7502,7 +7504,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeAccept() { assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 249, List.of((byte) 1)))); + new ShareAcknowledgementBatch(0, 249, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(250, sharePartition.nextFetchOffset()); // The SPSO should only move when the initial records in cached state are acknowledged with type ACKNOWLEDGE or ARCHIVED. @@ -7523,7 +7525,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeReject() { assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 249, List.of((byte) 3)))); + new ShareAcknowledgementBatch(0, 249, List.of((AcknowledgeType.REJECT.id))))); assertEquals(250, sharePartition.nextFetchOffset()); // The SPSO should only move when the initial records in cached state are acknowledged with type ACKNOWLEDGE or ARCHIVED. @@ -7543,7 +7545,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeRelease() { assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 249, List.of((byte) 2)))); + new ShareAcknowledgementBatch(0, 249, List.of(AcknowledgeType.RELEASE.id)))); // The SPSO should only move when the initial records in cached state are acknowledged with type ACKNOWLEDGE or ARCHIVED. assertEquals(0, sharePartition.startOffset()); @@ -7571,7 +7573,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForBatchS assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 12, List.of((byte) 1)))); + new ShareAcknowledgementBatch(0, 12, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(12L).state()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(13L).state()); @@ -7597,7 +7599,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForEntire assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 14, List.of((byte) 3)))); + new ShareAcknowledgementBatch(0, 14, List.of(AcknowledgeType.REJECT.id)))); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); assertEquals(MEMBER_ID, sharePartition.cachedState().get(15L).batchMemberId()); @@ -7624,7 +7626,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgementsInBetween() { assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(10, 14, List.of((byte) 3)))); + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.REJECT.id)))); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(9L).state()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(0L).offsetState().get(10L).state()); @@ -7655,7 +7657,7 @@ public void testMaybeUpdateCachedStateWhenAllRecordsInCachedStateAreAcknowledged assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 29, List.of((byte) 1)))); + new ShareAcknowledgementBatch(0, 29, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(sharePartition.canAcquireRecords()); assertEquals(30, sharePartition.startOffset()); @@ -7683,7 +7685,7 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() // First Acknowledgement for the first batch of records 0-19. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 19, List.of((byte) 1)))); + new ShareAcknowledgementBatch(0, 19, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(sharePartition.canAcquireRecords()); assertEquals(20, sharePartition.startOffset()); @@ -7695,7 +7697,7 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() assertTrue(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(20, 49, List.of((byte) 1)))); + new ShareAcknowledgementBatch(20, 49, List.of(AcknowledgeType.ACCEPT.id)))); assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(40L).offsetState().get(49L).state()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(40L).offsetState().get(50L).state()); @@ -7711,7 +7713,7 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() // Final Acknowledgement, all records are acknowledged here. sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(50, 179, List.of((byte) 3)))); + new ShareAcknowledgementBatch(50, 179, List.of(AcknowledgeType.REJECT.id)))); assertEquals(0, sharePartition.cachedState().size()); assertTrue(sharePartition.canAcquireRecords()); @@ -7761,7 +7763,7 @@ public void testMaybeUpdateCachedStateGapAfterLastOffsetAcknowledged() { // Sending acknowledgement for the first batch from 11 to 20 sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(11, 20, List.of((byte) 1)))); + new ShareAcknowledgementBatch(11, 20, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(sharePartition.canAcquireRecords()); // After the acknowledgement is done successfully, maybeUpdateCachedStateAndOffsets method is invoked to see @@ -7810,7 +7812,7 @@ public void testCanAcquireRecordsChangeResponsePostAcknowledgement() { assertEquals(249, sharePartition.endOffset()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 249, List.of((byte) 1)))); + new ShareAcknowledgementBatch(0, 249, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(sharePartition.canAcquireRecords()); assertEquals(250, sharePartition.startOffset()); @@ -7832,7 +7834,7 @@ public void testCanAcquireRecordsAfterReleaseAcknowledgement() { assertEquals(249, sharePartition.endOffset()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 89, List.of((byte) 2)))); + new ShareAcknowledgementBatch(0, 89, List.of(AcknowledgeType.RELEASE.id)))); // The SPSO should only move when the initial records in cached state are acknowledged with type ACKNOWLEDGE or ARCHIVED. assertEquals(0, sharePartition.startOffset()); @@ -7856,7 +7858,7 @@ public void testCanAcquireRecordsAfterArchiveAcknowledgement() { assertEquals(249, sharePartition.endOffset()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 89, List.of((byte) 3)))); + new ShareAcknowledgementBatch(0, 89, List.of(AcknowledgeType.REJECT.id)))); // The SPSO should only move when the initial records in cached state are acknowledged with type ACKNOWLEDGE or ARCHIVED. assertEquals(90, sharePartition.startOffset()); @@ -7879,7 +7881,7 @@ public void testCanAcquireRecordsAfterAcceptAcknowledgement() { assertEquals(249, sharePartition.endOffset()); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 89, List.of((byte) 1)))); + new ShareAcknowledgementBatch(0, 89, List.of(AcknowledgeType.ACCEPT.id)))); // The SPSO should only move when the initial records in cached state are acknowledged with type ACKNOWLEDGE or ARCHIVED. assertEquals(90, sharePartition.startOffset()); @@ -7906,7 +7908,7 @@ public void testAcknowledgeBatchWithWriteShareGroupStateFailure() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(UnknownTopicOrPartitionException.class, ackResult); @@ -7935,7 +7937,7 @@ public void testAcknowledgeOffsetWithWriteShareGroupStateFailure() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6); CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, - List.of(new ShareAcknowledgementBatch(8, 10, List.of((byte) 3)))); + List.of(new ShareAcknowledgementBatch(8, 10, List.of(AcknowledgeType.REJECT.id)))); assertTrue(ackResult.isCompletedExceptionally()); // Due to failure in writeShareGroupState, the cached state should not be updated. @@ -7961,11 +7963,11 @@ public void testAcknowledgeSubsetWithAnotherMember() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 7); sharePartition.acknowledge(MEMBER_ID, - List.of(new ShareAcknowledgementBatch(5, 7, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(5, 7, List.of(AcknowledgeType.ACCEPT.id)))); // Acknowledge subset with another member. CompletableFuture ackResult = sharePartition.acknowledge("member-2", - List.of(new ShareAcknowledgementBatch(9, 11, List.of((byte) 1)))); + List.of(new ShareAcknowledgementBatch(9, 11, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); } @@ -7981,10 +7983,10 @@ public void testAcknowledgeWithAnotherMemberRollbackBatchError() { fetchAcquiredRecords(sharePartition, memoryRecords(15, 5), 5); CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)), + new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)), // Acknowledging batch with another member will cause failure and rollback. - new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)), - new ShareAcknowledgementBatch(15, 19, List.of((byte) 1)))); + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(15, 19, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); @@ -8010,10 +8012,10 @@ public void testAcknowledgeWithAnotherMemberRollbackSubsetError() { sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM); CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)), - new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)), + new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.ACCEPT.id)), // Acknowledging subset with another member will cause failure and rollback. - new ShareAcknowledgementBatch(16, 18, List.of((byte) 1)))); + new ShareAcknowledgementBatch(16, 18, List.of(AcknowledgeType.ACCEPT.id)))); assertTrue(ackResult.isCompletedExceptionally()); assertFutureThrows(InvalidRecordStateException.class, ackResult); @@ -8040,11 +8042,11 @@ public void testMaxDeliveryCountLimitExceededForRecordBatch() { fetchAcquiredRecords(sharePartition, records, 10); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 14, List.of((byte) 2)))); + new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id)))); fetchAcquiredRecords(sharePartition, records, 10); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 14, List.of((byte) 2)))); + new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id)))); // All the records in the batch reached the max delivery count, hence they got archived and the cached state cleared. assertEquals(15, sharePartition.nextFetchOffset()); @@ -8068,9 +8070,9 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubset() { fetchAcquiredRecords(sharePartition, records2, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( - new ShareAcknowledgementBatch(10, 12, List.of((byte) 1)), - new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)), - new ShareAcknowledgementBatch(17, 19, List.of((byte) 1))))); + new ShareAcknowledgementBatch(10, 12, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(17, 19, List.of(AcknowledgeType.ACCEPT.id))))); // Send next batch from offset 13, only 2 records should be acquired. fetchAcquiredRecords(sharePartition, records1, 2); @@ -8078,7 +8080,7 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubset() { fetchAcquiredRecords(sharePartition, records2, 2); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)))); + new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(20, sharePartition.nextFetchOffset()); // Cached state will be empty because after the second release, the acquired records will now have moved to @@ -8098,12 +8100,12 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetAndCachedStateNotCl fetchAcquiredRecords(sharePartition, records1, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( - new ShareAcknowledgementBatch(0, 1, List.of((byte) 2))))); + new ShareAcknowledgementBatch(0, 1, List.of(AcknowledgeType.RELEASE.id))))); // Send next batch from offset 0, only 2 records should be acquired. fetchAcquiredRecords(sharePartition, memoryRecords(2), 2); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(0, 4, List.of((byte) 2)))); + new ShareAcknowledgementBatch(0, 4, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(2, sharePartition.nextFetchOffset()); assertEquals(1, sharePartition.cachedState().size()); @@ -8135,7 +8137,7 @@ public void testNextFetchOffsetPostAcquireAndAcknowledgeFunctionality() { assertEquals(20, sharePartition.nextFetchOffset()); sharePartition.acknowledge(memberId1, List.of( - new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)))); + new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)))); assertTrue(sharePartition.findNextFetchOffset()); assertEquals(5, sharePartition.nextFetchOffset()); @@ -8161,7 +8163,7 @@ public void testNextFetchOffsetWithMultipleConsumers() { assertEquals(3, sharePartition.nextFetchOffset()); sharePartition.acknowledge(memberId1, List.of( - new ShareAcknowledgementBatch(0, 2, List.of((byte) 2)))); + new ShareAcknowledgementBatch(0, 2, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(0, sharePartition.nextFetchOffset()); assertEquals(0, sharePartition.inFlightTerminalRecords()); @@ -8172,7 +8174,7 @@ public void testNextFetchOffsetWithMultipleConsumers() { assertEquals(5, sharePartition.nextFetchOffset()); sharePartition.acknowledge(memberId2, List.of( - new ShareAcknowledgementBatch(3, 4, List.of((byte) 2)))); + new ShareAcknowledgementBatch(3, 4, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(3, sharePartition.nextFetchOffset()); assertEquals(0, sharePartition.inFlightTerminalRecords()); } @@ -8185,7 +8187,7 @@ public void testNumberOfWriteCallsOnUpdates() { fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(2, 6, List.of((byte) 1)))); + new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.ACCEPT.id)))); // Acknowledge records will induce 1 write state RPC call via function isWriteShareGroupStateSuccessful. Mockito.verify(sharePartition, Mockito.times(1)).writeShareGroupState(anyList()); @@ -8204,10 +8206,10 @@ public void testReacquireSubsetWithAnotherMember() { fetchAcquiredRecords(sharePartition, memoryRecords(10, 12), 12); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 11, List.of((byte) 2)), - new ShareAcknowledgementBatch(12, 13, List.of((byte) 0)), - new ShareAcknowledgementBatch(14, 15, List.of((byte) 2)), - new ShareAcknowledgementBatch(17, 20, List.of((byte) 2)))); + new ShareAcknowledgementBatch(5, 11, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(12, 13, List.of(ACKNOWLEDGE_TYPE_GAP_ID)), + new ShareAcknowledgementBatch(14, 15, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(17, 20, List.of(AcknowledgeType.RELEASE.id)))); // Records 12-13 have been identified as gaps, hence they are kept in the cache as ARCHIVED state. assertEquals(2, sharePartition.inFlightTerminalRecords()); @@ -8296,8 +8298,8 @@ public void testAcquireWithWriteShareGroupStateDelay() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); List acknowledgementBatches = new ArrayList<>(); - acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3, List.of((byte) 2))); - acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))); + acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3, List.of(AcknowledgeType.RELEASE.id))); + acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id))); // Acknowledge 2-3, 5-9 offsets with RELEASE acknowledge type. sharePartition.acknowledge(MEMBER_ID, acknowledgementBatches);