Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][offloaders] Automatically evict Offloaded Ledgers from memory #19783

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,10 @@ managedLedgerMaxLedgerRolloverTimeMinutes=240
# Disable rollover with value 0 (Default value 0)
managedLedgerInactiveLedgerRolloverTimeSeconds=0

# Time to evict inactive offloaded ledger for inactive topic
# Disable eviction with value 0
managedLedgerInactiveOffloadedLedgerEvictionTimeSeconds=600

# Maximum ledger size before triggering a rollover for a topic (MB)
managedLedgerMaxSizePerLedgerMbytes=2048

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ManagedLedgerConfig {
private ManagedLedgerInterceptor managedLedgerInterceptor;
private Map<String, String> properties;
private int inactiveLedgerRollOverTimeMs = 0;
private int inactiveOffloadedLedgerEvictionTimeMs = 0;
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
Expand Down Expand Up @@ -691,6 +692,14 @@ public void setInactiveLedgerRollOverTime(int inactiveLedgerRollOverTimeMs, Time
this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs);
}

public int getInactiveOffloadedLedgerEvictionTimeMs() {
return inactiveOffloadedLedgerEvictionTimeMs;
}

public void setInactiveOffloadedLedgerEvictionTimeMs(int inactiveOffloadedLedgerEvictionTimeMs, TimeUnit unit) {
this.inactiveOffloadedLedgerEvictionTimeMs = (int) unit.toMillis(inactiveOffloadedLedgerEvictionTimeMs);
}

/**
* Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other cursors'
* backlog reads. (Default = 0, broker will not cache backlog reads)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

/**
* This is a marked interface for ledger handle that represent offloaded data.
*/
public interface OffloadedLedgerHandle {

default long lastAccessTimestamp() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
Expand Down Expand Up @@ -1924,6 +1925,8 @@ CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
// TODO: improve this to load ledger offloader by driver name recorded in metadata
Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info);
offloadDriverMetadata.put("ManagedLedgerName", name);
log.info("[{}] Opening ledger {} from offload driver {} with uid {}", name, ledgerId,
config.getLedgerOffloader().getOffloadDriverName(), uid);
openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid,
offloadDriverMetadata);
} else {
Expand All @@ -1949,11 +1952,16 @@ CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
void invalidateReadHandle(long ledgerId) {
CompletableFuture<ReadHandle> rhf = ledgerCache.remove(ledgerId);
if (rhf != null) {
rhf.thenAccept(ReadHandle::closeAsync)
.exceptionally(ex -> {
log.warn("[{}] Failed to close a Ledger ReadHandle:", name, ex);
return null;
});
rhf.thenCompose(r -> {
if (r instanceof OffloadedLedgerHandle) {
log.info("[{}] Closing ledger {} from offload driver {}", name, ledgerId,
config.getLedgerOffloader().getOffloadDriverName());
}
return r.closeAsync();
}).exceptionally(ex -> {
log.warn("[{}] Failed to close Ledger ReadHandle {}:", name, ledgerId, ex);
return null;
});
}
}

Expand Down Expand Up @@ -2560,7 +2568,45 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
internalTrimLedgers(false, promise);
}

@VisibleForTesting
List<Long> internalEvictOffloadedLedgers() {
int inactiveOffloadedLedgerEvictionTimeMs = config.getInactiveOffloadedLedgerEvictionTimeMs();
if (inactiveOffloadedLedgerEvictionTimeMs <= 0) {
return Collections.emptyList();
}
List<Long> ledgersToRelease = new ArrayList<>();

long now = clock.millis();

ledgerCache.forEach((ledgerId, ledger) -> {
if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
ReadHandle readHandle = ledger.join();
if (readHandle instanceof OffloadedLedgerHandle) {
long lastAccessTimestamp = ((OffloadedLedgerHandle) readHandle).lastAccessTimestamp();
if (lastAccessTimestamp >= 0) {
long delta = now - lastAccessTimestamp;
if (delta >= inactiveOffloadedLedgerEvictionTimeMs) {
log.info("[{}] Offloaded ledger {} can be released ({} ms elapsed since last access)",
name, ledgerId, delta);
ledgersToRelease.add(ledgerId);
} else if (log.isDebugEnabled()) {
log.debug("[{}] Offloaded ledger {} cannot be released ({} ms elapsed since last access)",
name, ledgerId, delta);
}
}
}
}
});
for (Long ledgerId : ledgersToRelease) {
invalidateReadHandle(ledgerId);
}
return ledgersToRelease;
}

void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {

internalEvictOffloadedLedgers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see internalEvictOffloadedLedgers being called anywhere but tests, could you explain how it will be triggered?

Copy link
Member

@lhotari lhotari Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode It gets called when internalTrimLedgers is called.


if (!factory.isMetadataServiceAvailable()) {
// Defer trimming of ledger if we cannot connect to metadata service
promise.completeExceptionally(new MetaStoreException("Metadata service is not available"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class OffloadEvictUnusedLedgersTest extends MockedBookKeeperTestCase {
private static final Logger log = LoggerFactory.getLogger(OffloadEvictUnusedLedgersTest.class);

@Test
public void testEvictUnusedLedgers() throws Exception {
OffloadPrefixReadTest.MockLedgerOffloader offloader =
new OffloadPrefixReadTest.MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
int inactiveOffloadedLedgerEvictionTimeMs = 10000;
config.setInactiveOffloadedLedgerEvictionTimeMs(inactiveOffloadedLedgerEvictionTimeMs, TimeUnit.MILLISECONDS);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger_evict", config);

// no evict when no offloaded ledgers
assertTrue(ledger.internalEvictOffloadedLedgers().isEmpty());

int i = 0;
for (; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
assertEquals(ledger.getLedgersInfoAsList().size(), 3);

ledger.offloadPrefix(ledger.getLastConfirmedEntry());

assertEquals(ledger.getLedgersInfoAsList().size(), 3);
assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());

// ledgers should be marked as offloaded
ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext());

// no evict when no offloaded ledgers are marked as inactive
assertTrue(ledger.internalEvictOffloadedLedgers().isEmpty());

ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
int j = 0;
for (Entry e : cursor.readEntries(25)) {
assertEquals(new String(e.getData()), "entry-" + j++);
}
cursor.close();

// set last access time to be 2x inactiveOffloadedLedgerEvictionTimeMs
AtomicLong first = new AtomicLong(-1);
assertTrue(!ledger.ledgerCache.isEmpty());
ledger.ledgerCache.forEach((id, l) -> {
if (first.compareAndSet(-1, id)) {
OffloadPrefixReadTest.MockOffloadReadHandle handle =
(OffloadPrefixReadTest.MockOffloadReadHandle) l.join();
handle.setLastAccessTimestamp(System.currentTimeMillis() - inactiveOffloadedLedgerEvictionTimeMs * 2);
}
});
assertNotEquals(first.get(), -1L);

List<Long> evicted = ledger.internalEvictOffloadedLedgers();
assertEquals(evicted.size(), 1);
assertEquals(first.get(), evicted.get(0).longValue());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.SneakyThrows;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
Expand All @@ -54,6 +57,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.net.BookieId;
Expand Down Expand Up @@ -212,6 +216,10 @@ static class MockLedgerOffloader implements LedgerOffloader {
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

Set<Long> offloadedLedgers() {
return offloads.values().stream().map(ReadHandle::getId).collect(Collectors.toSet());
}


@Override
public String getOffloadDriverName() {
Expand Down Expand Up @@ -272,10 +280,11 @@ public CompletableFuture<Void> closeAsync() {
}
}

static class MockOffloadReadHandle implements ReadHandle {
static class MockOffloadReadHandle implements ReadHandle, OffloadedLedgerHandle {
final long id;
final List<ByteBuf> entries = new ArrayList();
final LedgerMetadata metadata;
long lastAccessTimestamp = System.currentTimeMillis();

MockOffloadReadHandle(ReadHandle toCopy) throws Exception {
id = toCopy.getId();
Expand Down Expand Up @@ -353,6 +362,15 @@ private <T> CompletableFuture<T> unsupported() {
future.completeExceptionally(new UnsupportedOperationException());
return future;
}

@Override
public long lastAccessTimestamp() {
return lastAccessTimestamp;
}

public void setLastAccessTimestamp(long lastAccessTimestamp) {
this.lastAccessTimestamp = lastAccessTimestamp;
}
}

static class MockMetadata implements LedgerMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,14 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Time to evict inactive offloaded ledger for inactive topic. "
+ "Disable eviction with value 0 (Default value 600)"
)
private int managedLedgerInactiveOffloadedLedgerEvictionTimeSeconds = 600;

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,10 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setInactiveOffloadedLedgerEvictionTimeMs(
serviceConfig.getManagedLedgerInactiveOffloadedLedgerEvictionTimeSeconds(),
TimeUnit.SECONDS);

managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());
managedLedgerConfig.setMinimumBacklogCursorsForCaching(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
Expand All @@ -50,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobStoreBackedReadHandleImpl implements ReadHandle {
public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedgerHandle {
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
private static final int CACHE_TTL_SECONDS =
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 30 * 60);
Expand All @@ -73,6 +74,8 @@ enum State {

private State state = null;

private long lastAccessTimestamp = System.currentTimeMillis();

private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream, ExecutorService executor) {
this.ledgerId = ledgerId;
Expand Down Expand Up @@ -118,6 +121,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
touch();
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
ledgerId, firstEntry, lastEntry);
Expand Down Expand Up @@ -202,6 +206,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}

private void seekToEntry(long nextExpectedId) throws IOException {
touch();
Long knownOffset = entryOffsets.getIfPresent(nextExpectedId);
if (knownOffset != null) {
inputStream.seek(knownOffset);
Expand Down Expand Up @@ -299,4 +304,13 @@ public static ReadHandle open(ScheduledExecutorService executor,
State getState() {
return this.state;
}

@Override
public long lastAccessTimestamp() {
return lastAccessTimestamp;
}

private void touch() {
lastAccessTimestamp = System.currentTimeMillis();
}
}
Loading