Skip to content

Commit 46bd835

Browse files
committed
[fix][ml] Fix getNumberOfEntries may point to deleted ledger (#24852)
1 parent e18df9d commit 46bd835

File tree

3 files changed

+44
-6
lines changed

3 files changed

+44
-6
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3630,11 +3630,17 @@ long getNumberOfEntries(Range<PositionImpl> range) {
36303630
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
36313631

36323632
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
3633-
// If the 2 positions are in the same ledger
3634-
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3635-
count += fromIncluded ? 1 : 0;
3636-
count += toIncluded ? 1 : 0;
3637-
return count;
3633+
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
3634+
if (li != null) {
3635+
// If the 2 positions are in the same ledger
3636+
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3637+
count += fromIncluded ? 1 : 0;
3638+
count += toIncluded ? 1 : 0;
3639+
return count;
3640+
} else {
3641+
// if the ledgerId is not in the ledgers, it means it has been deleted
3642+
return 0;
3643+
}
36383644
} else {
36393645
long count = 0;
36403646
// If the from & to are pointing to different ledgers, then we need to :

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.testng.Assert.assertSame;
4343
import static org.testng.Assert.assertTrue;
4444
import static org.testng.Assert.fail;
45+
import com.google.common.collect.Range;
4546
import com.google.common.collect.Sets;
4647
import io.netty.buffer.ByteBuf;
4748
import io.netty.buffer.ByteBufAllocator;
@@ -2659,6 +2660,37 @@ public void testGetNumberOfEntriesInStorage() throws Exception {
26592660
assertEquals(length, numberOfEntries);
26602661
}
26612662

2663+
@Test
2664+
public void testGetNumberOfEntries() throws Exception {
2665+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
2666+
initManagedLedgerConfig(managedLedgerConfig);
2667+
managedLedgerConfig.setMaxEntriesPerLedger(5);
2668+
ManagedLedgerImpl managedLedger =
2669+
(ManagedLedgerImpl) factory.open("testGetNumberOfEntries", managedLedgerConfig);
2670+
// open cursor to prevent ledger to be deleted when ledger rollover
2671+
ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
2672+
int numberOfEntries = 10;
2673+
List<Position> positions = new ArrayList<>(numberOfEntries);
2674+
for (int i = 0; i < numberOfEntries; i++) {
2675+
positions.add(managedLedger.addEntry(("entry-" + i).getBytes(Encoding)));
2676+
}
2677+
Position mdPos = positions.get(numberOfEntries - 1);
2678+
Position rdPos = PositionFactory.create(mdPos.getLedgerId(), mdPos.getEntryId() + 1);
2679+
managedCursor.delete(positions);
2680+
// trigger ledger rollover and wait for the new ledger created
2681+
Awaitility.await().untilAsserted(() -> {
2682+
assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString());
2683+
});
2684+
managedLedger.rollCurrentLedgerIfFull();
2685+
Awaitility.await().untilAsserted(() -> {
2686+
assertEquals(managedLedger.getLedgersInfo().size(), 1);
2687+
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
2688+
});
2689+
2690+
long length = managedLedger.getNumberOfEntries(Range.closed(mdPos, rdPos));
2691+
assertEquals(length, 0);
2692+
}
2693+
26622694
@Test
26632695
public void testEstimatedBacklogSize() throws Exception {
26642696
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2281,7 +2281,7 @@ public void testAcknowledgeWithReconnection() throws Exception {
22812281

22822282
Awaitility.await().untilAsserted(() ->
22832283
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
2284-
5));
2284+
0));
22852285

22862286
// Make consumer reconnect to broker
22872287
admin.topics().unload(topicName);

0 commit comments

Comments
 (0)