Skip to content

Commit 3ecbe8c

Browse files
poorbarcodelhotari
authored andcommitted
[fix][broker] Stop to retry to read entries if the replicator has terminated (#24880)
(cherry picked from commit 313ae97)
1 parent 74c366c commit 3ecbe8c

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,9 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) {
279279
}
280280

281281
protected void readMoreEntries() {
282+
if (state.equals(Terminated) || state.equals(Terminating)) {
283+
return;
284+
}
282285
// Acquire permits and check state of producer.
283286
InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
284287
if (newInFlightTask == null) {
@@ -963,12 +966,19 @@ public void beforeTerminate() {
963966
protected boolean hasPendingRead() {
964967
synchronized (inFlightTasks) {
965968
for (InFlightTask task : inFlightTasks) {
966-
if (task.readPos != null && task.entries == null) {
969+
// The purpose of calling "getReadPos" instead of calling "readPos" is to make the test
970+
// "testReplicationTaskStoppedAfterTopicClosed" can counter the calling times of "readMoreEntries".
971+
if (task.getReadPos() != null && task.entries == null) {
967972
// Skip the current reading if there is a pending cursor reading.
968973
return true;
969974
}
970975
}
971976
}
972977
return false;
973978
}
979+
980+
@VisibleForTesting
981+
String getReplicatorId() {
982+
return replicatorId;
983+
}
974984
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.mockito.Mockito.doAnswer;
2122
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.spy;
24+
import static org.testng.Assert.assertEquals;
25+
import static org.testng.Assert.assertTrue;
2226
import java.util.ArrayList;
2327
import java.util.Arrays;
2428
import java.util.Collections;
2529
import java.util.LinkedList;
2630
import java.util.List;
31+
import java.util.concurrent.atomic.AtomicInteger;
2732
import lombok.extern.slf4j.Slf4j;
2833
import org.apache.bookkeeper.mledger.Entry;
2934
import org.apache.bookkeeper.mledger.Position;
@@ -34,6 +39,8 @@
3439
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
3540
import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
3641
import org.apache.pulsar.client.api.MessageId;
42+
import org.mockito.invocation.InvocationOnMock;
43+
import org.mockito.stubbing.Answer;
3744
import org.testng.Assert;
3845
import org.testng.annotations.AfterClass;
3946
import org.testng.annotations.BeforeClass;
@@ -66,6 +73,40 @@ private void createTopics() throws Exception {
6673
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
6774
}
6875

76+
@Test
77+
public void testReplicationTaskStoppedAfterTopicClosed() throws Exception {
78+
// Close a topic, which has enabled replication.
79+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
80+
admin1.topics().createNonPartitionedTopic(topicName);
81+
waitReplicatorStarted(topicName, pulsar2);
82+
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false)
83+
.join().get();
84+
PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators().get(cluster2);
85+
admin1.topics().unload(topicName);
86+
87+
// Inject a task into the "inFlightTasks" to calculate how many times the method "replicator.readMoreEntries"
88+
// has been called.
89+
AtomicInteger counter = new AtomicInteger();
90+
InFlightTask injectedTask = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId());
91+
injectedTask.setEntries(Collections.emptyList());
92+
InFlightTask spyTask = spy(injectedTask);
93+
replicator.inFlightTasks.add(spyTask);
94+
doAnswer(new Answer() {
95+
@Override
96+
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
97+
counter.incrementAndGet();
98+
return invocationOnMock.callRealMethod();
99+
}
100+
}).when(spyTask).getReadPos();
101+
102+
// Verify: there is no scheduled task to retry to read entries to replicate.
103+
// Call "readMoreEntries" to make the issue happen.
104+
replicator.readMoreEntries();
105+
Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10);
106+
assertEquals(replicator.getState(), AbstractReplicator.State.Terminated);
107+
assertTrue(counter.get() <= 1);
108+
}
109+
69110
@Test
70111
public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
71112
log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

0 commit comments

Comments
 (0)