Skip to content

Commit a663ce3

Browse files
KAFKA-18265: Move acquisition lock classes from share partition (1/N) (#20227)
While working on KAFKA-19476, I realized that we need to refactor SharePartition for read/write lock handling. I have started some work in the area. For the initial PR, I have moved AcquisitionLockTimeout class outside of SharePartition. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 93adaea commit a663ce3

File tree

4 files changed

+150
-74
lines changed

4 files changed

+150
-74
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 46 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.kafka.coordinator.group.GroupConfigManager;
4242
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
4343
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
44+
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
45+
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
4446
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
4547
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
4648
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -2391,59 +2393,61 @@ private AcquisitionLockTimerTask acquisitionLockTimerTask(
23912393
long lastOffset,
23922394
long delayMs
23932395
) {
2394-
return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset);
2396+
return new AcquisitionLockTimerTask(time, delayMs, memberId, firstOffset, lastOffset, releaseAcquisitionLockOnTimeout(), sharePartitionMetrics);
23952397
}
23962398

2397-
private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) {
2398-
List<PersisterStateBatch> stateBatches;
2399-
lock.writeLock().lock();
2400-
try {
2401-
Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset);
2402-
if (floorOffset == null) {
2403-
log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition);
2404-
return;
2405-
}
2406-
stateBatches = new ArrayList<>();
2407-
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
2408-
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
2409-
InFlightBatch inFlightBatch = entry.getValue();
2399+
private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
2400+
return (memberId, firstOffset, lastOffset) -> {
2401+
List<PersisterStateBatch> stateBatches;
2402+
lock.writeLock().lock();
2403+
try {
2404+
Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset);
2405+
if (floorOffset == null) {
2406+
log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition);
2407+
return;
2408+
}
2409+
stateBatches = new ArrayList<>();
2410+
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
2411+
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
2412+
InFlightBatch inFlightBatch = entry.getValue();
24102413

2411-
if (inFlightBatch.offsetState() == null
2414+
if (inFlightBatch.offsetState() == null
24122415
&& inFlightBatch.batchState() == RecordState.ACQUIRED
24132416
&& checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) {
24142417

2415-
// For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some
2416-
// acquired records that need to move to archived state despite their delivery count.
2417-
inFlightBatch.maybeInitializeOffsetStateUpdate();
2418-
}
2418+
// For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some
2419+
// acquired records that need to move to archived state despite their delivery count.
2420+
inFlightBatch.maybeInitializeOffsetStateUpdate();
2421+
}
24192422

2420-
// Case when the state of complete batch is valid
2421-
if (inFlightBatch.offsetState() == null) {
2422-
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId);
2423-
} else { // Case when batch has a valid offset state map.
2424-
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset);
2423+
// Case when the state of complete batch is valid
2424+
if (inFlightBatch.offsetState() == null) {
2425+
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId);
2426+
} else { // Case when batch has a valid offset state map.
2427+
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset);
2428+
}
24252429
}
2426-
}
24272430

2428-
if (!stateBatches.isEmpty()) {
2429-
writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
2430-
if (exception != null) {
2431-
log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}",
2432-
groupId, topicIdPartition, memberId, exception);
2433-
}
2434-
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
2435-
// Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
2436-
maybeUpdateCachedStateAndOffsets();
2437-
});
2431+
if (!stateBatches.isEmpty()) {
2432+
writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
2433+
if (exception != null) {
2434+
log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}",
2435+
groupId, topicIdPartition, memberId, exception);
2436+
}
2437+
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
2438+
// Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
2439+
maybeUpdateCachedStateAndOffsets();
2440+
});
2441+
}
2442+
} finally {
2443+
lock.writeLock().unlock();
24382444
}
2439-
} finally {
2440-
lock.writeLock().unlock();
2441-
}
24422445

2443-
// If we have an acquisition lock timeout for a share-partition, then we should check if
2444-
// there is a pending share fetch request for the share-partition and complete it.
2445-
// Skip null check for stateBatches, it should always be initialized if reached here.
2446-
maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
2446+
// If we have an acquisition lock timeout for a share-partition, then we should check if
2447+
// there is a pending share fetch request for the share-partition and complete it.
2448+
// Skip null check for stateBatches, it should always be initialized if reached here.
2449+
maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
2450+
};
24472451
}
24482452

24492453
private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch,
@@ -2834,35 +2838,6 @@ void gapStartOffset(long gapStartOffset) {
28342838
}
28352839
}
28362840

2837-
// Visible for testing
2838-
final class AcquisitionLockTimerTask extends TimerTask {
2839-
private final long expirationMs;
2840-
private final String memberId;
2841-
private final long firstOffset;
2842-
private final long lastOffset;
2843-
2844-
AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) {
2845-
super(delayMs);
2846-
this.expirationMs = time.hiResClockMs() + delayMs;
2847-
this.memberId = memberId;
2848-
this.firstOffset = firstOffset;
2849-
this.lastOffset = lastOffset;
2850-
}
2851-
2852-
long expirationMs() {
2853-
return expirationMs;
2854-
}
2855-
2856-
/**
2857-
* The task is executed when the acquisition lock timeout is reached. The task releases the acquired records.
2858-
*/
2859-
@Override
2860-
public void run() {
2861-
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1);
2862-
releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset);
2863-
}
2864-
}
2865-
28662841
/**
28672842
* The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
28682843
*/

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.kafka.coordinator.group.GroupConfigManager;
5656
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
5757
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
58+
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
5859
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
5960
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
6061
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
@@ -5153,7 +5154,7 @@ public void testScheduleAcquisitionLockTimeoutValueFromGroupConfig() {
51535154
SharePartition sharePartition = SharePartitionBuilder.builder()
51545155
.withGroupConfigManager(groupConfigManager).build();
51555156

5156-
SharePartition.AcquisitionLockTimerTask timerTask = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
5157+
AcquisitionLockTimerTask timerTask = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
51575158

51585159
Mockito.verify(groupConfigManager, Mockito.times(2)).groupConfig(GROUP_ID);
51595160
Mockito.verify(groupConfig).shareRecordLockDurationMs();
@@ -5175,13 +5176,13 @@ public void testScheduleAcquisitionLockTimeoutValueUpdatesSuccessfully() {
51755176
SharePartition sharePartition = SharePartitionBuilder.builder()
51765177
.withGroupConfigManager(groupConfigManager).build();
51775178

5178-
SharePartition.AcquisitionLockTimerTask timerTask1 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
5179+
AcquisitionLockTimerTask timerTask1 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
51795180

51805181
Mockito.verify(groupConfigManager, Mockito.times(2)).groupConfig(GROUP_ID);
51815182
Mockito.verify(groupConfig).shareRecordLockDurationMs();
51825183
assertEquals(expectedDurationMs1, timerTask1.delayMs);
51835184

5184-
SharePartition.AcquisitionLockTimerTask timerTask2 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
5185+
AcquisitionLockTimerTask timerTask2 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
51855186

51865187
Mockito.verify(groupConfigManager, Mockito.times(4)).groupConfig(GROUP_ID);
51875188
Mockito.verify(groupConfig, Mockito.times(2)).shareRecordLockDurationMs();
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.share.fetch;
18+
19+
/**
20+
* AcquisitionLockTimeoutHandler is an interface that defines a handler for acquisition lock timeouts.
21+
* It is used to handle cases where the acquisition lock for a share partition times out.
22+
*/
23+
public interface AcquisitionLockTimeoutHandler {
24+
25+
/**
26+
* Handles the acquisition lock timeout for a share partition.
27+
*
28+
* @param memberId the id of the member that requested the lock
29+
* @param firstOffset the first offset
30+
* @param lastOffset the last offset
31+
*/
32+
void handle(String memberId, long firstOffset, long lastOffset);
33+
34+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.share.fetch;
18+
19+
import org.apache.kafka.common.utils.Time;
20+
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
21+
import org.apache.kafka.server.util.timer.TimerTask;
22+
23+
/**
24+
* AcquisitionLockTimerTask is a timer task that is executed when the acquisition lock timeout is reached.
25+
* It releases the acquired records.
26+
*/
27+
public class AcquisitionLockTimerTask extends TimerTask {
28+
29+
private final long expirationMs;
30+
private final String memberId;
31+
private final long firstOffset;
32+
private final long lastOffset;
33+
private final AcquisitionLockTimeoutHandler timeoutHandler;
34+
private final SharePartitionMetrics sharePartitionMetrics;
35+
36+
public AcquisitionLockTimerTask(
37+
Time time,
38+
long delayMs,
39+
String memberId,
40+
long firstOffset,
41+
long lastOffset,
42+
AcquisitionLockTimeoutHandler timeoutHandler,
43+
SharePartitionMetrics sharePartitionMetrics
44+
) {
45+
super(delayMs);
46+
this.expirationMs = time.hiResClockMs() + delayMs;
47+
this.memberId = memberId;
48+
this.firstOffset = firstOffset;
49+
this.lastOffset = lastOffset;
50+
this.timeoutHandler = timeoutHandler;
51+
this.sharePartitionMetrics = sharePartitionMetrics;
52+
}
53+
54+
public long expirationMs() {
55+
return expirationMs;
56+
}
57+
58+
/**
59+
* The task is executed when the acquisition lock timeout is reached. The task releases the acquired records.
60+
*/
61+
@Override
62+
public void run() {
63+
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1);
64+
timeoutHandler.handle(memberId, firstOffset, lastOffset);
65+
}
66+
}

0 commit comments

Comments
 (0)