diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 559b39e0fb408..bda5acce236b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -279,6 +279,9 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) { } protected void readMoreEntries() { + if (state.equals(Terminated) || state.equals(Terminating)) { + return; + } // Acquire permits and check state of producer. InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema(); if (newInFlightTask == null) { @@ -963,7 +966,9 @@ public void beforeTerminate() { protected boolean hasPendingRead() { synchronized (inFlightTasks) { for (InFlightTask task : inFlightTasks) { - if (task.readPos != null && task.entries == null) { + // The purpose of calling "getReadPos" instead of calling "readPos" is to make the test + // "testReplicationTaskStoppedAfterTopicClosed" can counter the calling times of "readMoreEntries". + if (task.getReadPos() != null && task.entries == null) { // Skip the current reading if there is a pending cursor reading. return true; } @@ -971,4 +976,9 @@ protected boolean hasPendingRead() { } return false; } + + @VisibleForTesting + String getReplicatorId() { + return replicatorId; + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index 68f50aa9e87f5..b23b4565e5f2f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -18,12 +18,17 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; @@ -34,6 +39,8 @@ import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask; import org.apache.pulsar.client.api.MessageId; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -66,6 +73,40 @@ private void createTopics() throws Exception { admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); } + @Test + public void testReplicationTaskStoppedAfterTopicClosed() throws Exception { + // Close a topic, which has enabled replication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + waitReplicatorStarted(topicName, pulsar2); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false) + .join().get(); + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators().get(cluster2); + admin1.topics().unload(topicName); + + // Inject a task into the "inFlightTasks" to calculate how many times the method "replicator.readMoreEntries" + // has been called. + AtomicInteger counter = new AtomicInteger(); + InFlightTask injectedTask = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId()); + injectedTask.setEntries(Collections.emptyList()); + InFlightTask spyTask = spy(injectedTask); + replicator.inFlightTasks.add(spyTask); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + counter.incrementAndGet(); + return invocationOnMock.callRealMethod(); + } + }).when(spyTask).getReadPos(); + + // Verify: there is no scheduled task to retry to read entries to replicate. + // Call "readMoreEntries" to make the issue happen. + replicator.readMoreEntries(); + Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10); + assertEquals(replicator.getState(), AbstractReplicator.State.Terminated); + assertTrue(counter.get() <= 1); + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");