Skip to content

Commit 0e9f0e3

Browse files
Technoboy-nodece
authored andcommitted
[fix][ml] Fix getNumberOfEntries may point to deleted ledger (apache#24852)
1 parent b2d220b commit 0e9f0e3

File tree

3 files changed

+44
-6
lines changed

3 files changed

+44
-6
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3725,11 +3725,17 @@ long getNumberOfEntries(Range<PositionImpl> range) {
37253725
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
37263726

37273727
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
3728-
// If the 2 positions are in the same ledger
3729-
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3730-
count += fromIncluded ? 1 : 0;
3731-
count += toIncluded ? 1 : 0;
3732-
return count;
3728+
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
3729+
if (li != null) {
3730+
// If the 2 positions are in the same ledger
3731+
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3732+
count += fromIncluded ? 1 : 0;
3733+
count += toIncluded ? 1 : 0;
3734+
return count;
3735+
} else {
3736+
// if the ledgerId is not in the ledgers, it means it has been deleted
3737+
return 0;
3738+
}
37333739
} else {
37343740
long count = 0;
37353741
// If the from & to are pointing to different ledgers, then we need to :

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.testng.Assert.assertSame;
4444
import static org.testng.Assert.assertTrue;
4545
import static org.testng.Assert.fail;
46+
import com.google.common.collect.Range;
4647
import com.google.common.collect.Sets;
4748
import io.netty.buffer.ByteBuf;
4849
import io.netty.buffer.ByteBufAllocator;
@@ -2663,6 +2664,37 @@ public void testGetNumberOfEntriesInStorage() throws Exception {
26632664
assertEquals(length, numberOfEntries);
26642665
}
26652666

2667+
@Test
2668+
public void testGetNumberOfEntries() throws Exception {
2669+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
2670+
initManagedLedgerConfig(managedLedgerConfig);
2671+
managedLedgerConfig.setMaxEntriesPerLedger(5);
2672+
ManagedLedgerImpl managedLedger =
2673+
(ManagedLedgerImpl) factory.open("testGetNumberOfEntries", managedLedgerConfig);
2674+
// open cursor to prevent ledger to be deleted when ledger rollover
2675+
ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
2676+
int numberOfEntries = 10;
2677+
List<Position> positions = new ArrayList<>(numberOfEntries);
2678+
for (int i = 0; i < numberOfEntries; i++) {
2679+
positions.add(managedLedger.addEntry(("entry-" + i).getBytes(Encoding)));
2680+
}
2681+
Position mdPos = positions.get(numberOfEntries - 1);
2682+
Position rdPos = PositionFactory.create(mdPos.getLedgerId(), mdPos.getEntryId() + 1);
2683+
managedCursor.delete(positions);
2684+
// trigger ledger rollover and wait for the new ledger created
2685+
Awaitility.await().untilAsserted(() -> {
2686+
assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString());
2687+
});
2688+
managedLedger.rollCurrentLedgerIfFull();
2689+
Awaitility.await().untilAsserted(() -> {
2690+
assertEquals(managedLedger.getLedgersInfo().size(), 1);
2691+
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
2692+
});
2693+
2694+
long length = managedLedger.getNumberOfEntries(Range.closed(mdPos, rdPos));
2695+
assertEquals(length, 0);
2696+
}
2697+
26662698
@Test
26672699
public void testEstimatedBacklogSize() throws Exception {
26682700
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2265,7 +2265,7 @@ public void testAcknowledgeWithReconnection() throws Exception {
22652265

22662266
Awaitility.await().untilAsserted(() ->
22672267
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
2268-
5));
2268+
0));
22692269

22702270
// Make consumer reconnect to broker
22712271
admin.topics().unload(topicName);

0 commit comments

Comments
 (0)