Skip to content

Commit d56758f

Browse files
codelipenghuiclaude
authored andcommitted
[fix][ml] Fix ledger trimming race causing cursor to point to deleted ledgers (#24855)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 2afdc9d commit d56758f

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2593,7 +2593,8 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
25932593

25942594
public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
25952595
for (ManagedCursor cursor : cursors) {
2596-
Position lastAckedPosition = cursor.getMarkDeletedPosition();
2596+
Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null
2597+
? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition();
25972598
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
25982599
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
25992600
.map(Map.Entry::getValue).orElse(null);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4484,4 +4484,153 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
44844484
assertEquals(ml.currentLedgerEntries, 0);
44854485
});
44864486
}
4487+
4488+
/**
4489+
* Verifies that ledger trimming respects the persistent cursor position, not just the in-memory position.
4490+
*
4491+
* <p><b>Test Flow:</b>
4492+
* <ol>
4493+
* <li><b>Setup:</b> Create 60 entries across multiple ledgers (10 entries per ledger)
4494+
* <li><b>Initial Acks:</b> Delete entries 0, 5-9 and wait for persistence
4495+
* <ul><li>Persistent position: entry 0</li><li>In-memory position: entry 0</li></ul>
4496+
* <li><b>Inject Delay:</b> Add 30-second delay to BookKeeper writes (simulates slow ZK/BK)
4497+
* <li><b>Delayed Acks:</b> Asynchronously delete entries 1-4
4498+
* <ul><li>Persistent position: entry 0 (delayed)</li><li>In-memory position: entry 9</li></ul>
4499+
* <li><b>Pre-Trim Sync:</b> Call {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()}
4500+
* <li><b>Trigger Trim:</b> Start ledger trimming process
4501+
* <li><b>Verify:</b> First ledger is preserved because persistent position (entry 0) still points to it
4502+
* </ol>
4503+
*
4504+
* <p><b>Success Criteria:</b>
4505+
* The first ledger must NOT be deleted, preventing the cursor from pointing to a non-existent
4506+
* ledger after topic reload. This avoids negative backlog calculations.
4507+
*
4508+
* <p><b>What This Tests:</b>
4509+
* Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} correctly uses the
4510+
* persistent cursor position (not in-memory) when determining which ledgers are safe to trim.
4511+
*/
4512+
@Test
4513+
public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
4514+
final String ledgerName = "testCursorPointsToDeletedLedgerAfterTrimAndReload";
4515+
final String cursorName = "test-cursor";
4516+
4517+
// ===== SETUP: Create managed ledger with small ledgers =====
4518+
ManagedLedgerConfig config = new ManagedLedgerConfig();
4519+
config.setMaxEntriesPerLedger(10);
4520+
4521+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, config);
4522+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
4523+
4524+
// ===== PHASE 1: Write entries to create multiple ledgers =====
4525+
int totalEntries = 60;
4526+
log.info("=== PHASE 1: Writing {} entries to create multiple ledgers ===", totalEntries);
4527+
for (int i = 0; i < totalEntries; i++) {
4528+
Position pos = ledger.addEntry(("message-" + i).getBytes());
4529+
log.info("Added entry: {}", pos);
4530+
}
4531+
4532+
List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
4533+
log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
4534+
ledgersAfterWrite.stream()
4535+
.map(l -> String.format("L%d(%d entries)", l.getLedgerId(), l.getEntries()))
4536+
.toArray());
4537+
4538+
assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5 ledgers");
4539+
long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
4540+
4541+
// ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait for persistence =====
4542+
log.info("=== PHASE 2: Acknowledging initial entries in first ledger {} ===", firstLedgerId);
4543+
List<Entry> entries = cursor.readEntries(10);
4544+
4545+
// Delete entries 5-9 first (out of order)
4546+
log.info("Deleting entries 5-9");
4547+
for (int i = 5; i < 10; i++) {
4548+
cursor.delete(entries.get(i).getPosition());
4549+
}
4550+
4551+
// Delete entry 0, which advances mark-delete position
4552+
log.info("Deleting entry 0 - this advances mark-delete position");
4553+
cursor.delete(entries.get(0).getPosition());
4554+
4555+
// Verify in-memory cursor position
4556+
Position initialMarkDelete = cursor.getMarkDeletedPosition();
4557+
assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
4558+
"Mark-delete should be in first ledger");
4559+
assertEquals(initialMarkDelete.getEntryId(), entries.get(0).getEntryId(),
4560+
"Mark-delete should be at entry 0");
4561+
4562+
// Wait for this position to be persisted
4563+
log.info("Waiting for initial mark-delete position to persist: {}", initialMarkDelete);
4564+
Awaitility.await().untilAsserted(() -> {
4565+
assertEquals(cursor.getPersistentMarkDeletedPosition(), initialMarkDelete,
4566+
"Persistent position should catch up to in-memory position");
4567+
});
4568+
log.info("Initial position persisted successfully");
4569+
4570+
// ===== PHASE 3: Inject delay to simulate slow persistence =====
4571+
long delay = 30;
4572+
log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
4573+
delay);
4574+
bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
4575+
4576+
// ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence will be delayed) =====
4577+
log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will be delayed) ===");
4578+
for (int i = 1; i < 5; i++) {
4579+
final int index = i;
4580+
cursor.asyncDelete(entries.get(i).getPosition(), new AsyncCallbacks.DeleteCallback() {
4581+
@Override
4582+
public void deleteComplete(Object ctx) {
4583+
log.info("Entry {} deletion completed", index);
4584+
}
4585+
4586+
@Override
4587+
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
4588+
log.error("Entry {} deletion failed", index, exception);
4589+
}
4590+
}, null);
4591+
}
4592+
4593+
// Verify in-memory position has advanced to entry 9
4594+
Position newMarkDelete = cursor.getMarkDeletedPosition();
4595+
assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
4596+
"Mark-delete should still be in first ledger");
4597+
assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
4598+
"Mark-delete should have advanced to entry 9 (in-memory)");
4599+
log.info("In-memory mark-delete position: {}", newMarkDelete);
4600+
4601+
// ===== PHASE 5: Update cursor before trimming (important synchronization point) =====
4602+
log.info("=== PHASE 5: Calling maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
4603+
ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
4604+
4605+
// ===== PHASE 6: Trigger ledger trimming =====
4606+
log.info("=== PHASE 6: Triggering ledger trimming ===");
4607+
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
4608+
ledger.trimConsumedLedgersInBackground(trimFuture);
4609+
trimFuture.get();
4610+
log.info("Trimming completed");
4611+
4612+
// ===== VERIFICATION: Ledgers should NOT be trimmed =====
4613+
log.info("=== VERIFICATION ===");
4614+
4615+
// Persistent position should still be at old position (entry 0)
4616+
Position persistentPosition = cursor.getPersistentMarkDeletedPosition();
4617+
assertEquals(persistentPosition, initialMarkDelete,
4618+
"Persistent position should not have advanced (delayed)");
4619+
log.info("Persistent mark-delete position (as expected): {}", persistentPosition);
4620+
log.info("In-memory mark-delete position: {}", newMarkDelete);
4621+
4622+
// First ledger should still exist (not trimmed)
4623+
Awaitility.await().untilAsserted(() -> {
4624+
long firstRemainingLedger = ledger.getFirstPosition().getLedgerId();
4625+
assertEquals(firstRemainingLedger, ledgersAfterWrite.get(0).getLedgerId(),
4626+
"First ledger should NOT be trimmed because persistent cursor position "
4627+
+ "is still pointing to it (entry 0)");
4628+
});
4629+
log.info("SUCCESS: First ledger {} was correctly preserved", firstLedgerId);
4630+
4631+
// ===== CLEANUP =====
4632+
entries.forEach(Entry::release);
4633+
cursor.close();
4634+
ledger.close();
4635+
}
44874636
}

0 commit comments

Comments
 (0)