Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
Expand Down Expand Up @@ -82,17 +83,15 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
return maxEntries;
}

// Adjust the read position to ensure it falls within the valid range of available ledgers.
// This handles special cases such as EARLIEST and LATEST positions by resetting them
// to the first available ledger or the last active ledger, respectively.
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
readPosition = PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
} else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
readPosition =
PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
} else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
readPosition = PositionFactory.create(ledgersInfo.firstKey(), 0);
if (ledgersInfo.isEmpty()) {
return 1;
}

try {
readPosition = adjustReadPosition(readPosition, ledgersInfo, lastLedgerId, lastLedgerTotalEntries);
} catch (NoSuchElementException e) {
// there was a race condition where ledgersInfo became empty just before adjustReadPosition was called
return 1;
}

long estimatedEntryCount = 0;
Expand Down Expand Up @@ -183,4 +182,28 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
// Ensure at least one entry is always returned as the result
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
}

private static Position adjustReadPosition(Position readPosition,
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries) {
// Adjust the read position to ensure it falls within the valid range of available ledgers.
// This handles special cases such as EARLIEST and LATEST positions by resetting them
// to the first available ledger or the last active ledger, respectively.
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
return PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
}
long lastKey = ledgersInfo.lastKey();
if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
if (lastEntry != null && lastEntry.getKey() == lastKey) {
return PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
}
}
long firstKey = ledgersInfo.firstKey();
if (readPosition.getLedgerId() < firstKey) {
return PositionFactory.create(firstKey, 0);
}
return readPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.util.HashSet;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -289,4 +292,41 @@ public void testMaxSizeIsLongMAX_VALUE() {
int result = estimateEntryCountByBytesSize(Long.MAX_VALUE);
assertEquals(result, maxEntries);
}

@Test
public void testNoLedgers() {
readPosition = PositionFactory.EARLIEST;
// remove all ledgers from ledgersInfo
ledgersInfo.clear();
int result = estimateEntryCountByBytesSize(5_000_000);
// expect that result is 1 because the estimation couldn't be done
assertEquals(result, 1);
}

@Test
public void testNoLedgersRaceFirstKey() {
readPosition = PositionFactory.EARLIEST;
// remove all ledgers from ledgersInfo
ledgersInfo = mock(NavigableMap.class);
when(ledgersInfo.isEmpty()).thenReturn(false);
when(ledgersInfo.firstKey()).thenThrow(NoSuchElementException.class);
when(ledgersInfo.lastKey()).thenReturn(1L);
int result = estimateEntryCountByBytesSize(5_000_000);
// expect that result is 1 because the estimation couldn't be done
assertEquals(result, 1);
}

@Test
public void testNoLedgersRaceLastKey() {
readPosition = PositionFactory.EARLIEST;
// remove all ledgers from ledgersInfo
ledgersInfo = mock(NavigableMap.class);
lastLedgerId = null;
when(ledgersInfo.isEmpty()).thenReturn(false);
when(ledgersInfo.firstKey()).thenReturn(1L);
when(ledgersInfo.lastKey()).thenThrow(NoSuchElementException.class);
int result = estimateEntryCountByBytesSize(5_000_000);
// expect that result is 1 because the estimation couldn't be done
assertEquals(result, 1);
}
}
Loading