Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
*/
package org.apache.bookkeeper.bookie.storage.ldb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
Expand All @@ -48,7 +50,8 @@ public class EntryLocationIndex implements Closeable {
private final KeyValueStorage locationsDb;
private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build();
private final EntryLocationIndexStats stats;
private boolean isCompacting;
@VisibleForTesting
final AtomicBoolean compacting = new AtomicBoolean(false);

public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
StatsLogger stats) throws IOException {
Expand All @@ -67,7 +70,19 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora

@Override
public void close() throws IOException {
log.info("Closing EntryLocationIndex");
while (!compacting.compareAndSet(false, true)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waiting could result in the system being stuck here indefinitely, or it could take an exceptionally long time to get stuck at this step.

Should we add a maximum waiting time, or do something else, such as modifying RocksDB operations?

Copy link
Contributor Author

@AnonHxy AnonHxy Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think so. Closing a DB which in compaction status may triger core dumps or other unexpected error. If closing the DB cost too long time, I think we'd better find out why or kill -9 if we need.

The handling approach here is similar to the procedure below:

while (!compacting.compareAndSet(false, true)) {
// Wait till the thread stops compacting
Thread.sleep(100);

// Wait till the locationsDb stops compacting
log.info("Waiting the locationsDb stops compacting");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
locationsDb.close();
log.info("Closed EntryLocationIndex");
}

public long getLocation(long ledgerId, long entryId) throws IOException {
Expand Down Expand Up @@ -203,15 +218,17 @@ public String getEntryLocationDBPath() {

public void compact() throws IOException {
try {
isCompacting = true;
if (!compacting.compareAndSet(false, true)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question:

Is the isCompacting syntax the root cause of this problem?
Or is it simply a matter of rewriting the method more efficiently, thus ruling out issues with the isCompacting setting?

Copy link
Contributor Author

@AnonHxy AnonHxy Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's the root cause. We should cancel the compacting if we have closed the DB, or we should delay closing the DB if we have already been in compaction status. So here we need an atomic variable to serve as a flag for the DB status. @StevenLuMT

return;
}
locationsDb.compact();
} finally {
isCompacting = false;
compacting.set(false);
}
}

public boolean isCompacting() {
return isCompacting;
return compacting.get();
}

public void removeOffsetFromDeletedLedgers() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
package org.apache.bookkeeper.bookie.storage.ldb;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.jupiter.api.Timeout;

/**
* Unit test for {@link EntryLocationIndex}.
Expand Down Expand Up @@ -231,4 +236,39 @@ public void testEntryIndexLookupLatencyStats() throws IOException {
assertEquals(1, lookupEntryLocationOpStats.getFailureCount());
assertEquals(1, lookupEntryLocationOpStats.getSuccessCount());
}

@Test
@Timeout(60)
public void testClose() throws Exception {
File tmpDir = File.createTempFile("bkTest", ".dir");
tmpDir.delete();
tmpDir.mkdir();
tmpDir.deleteOnExit();

EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);

// mock EntryLocationIndex is compacting
idx.compacting.set(true);
AtomicBoolean closeFlag = new AtomicBoolean(false);
AtomicLong closeEscapedMills = new AtomicLong(0);
new Thread(() -> {
try {
long start = System.currentTimeMillis();
idx.close();
closeEscapedMills.set(System.currentTimeMillis() - start);
closeFlag.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
long sleepMills = 10_000;
Thread.sleep(sleepMills);
assertFalse(closeFlag.get());

// mock EntryLocationIndex finish compacting
idx.compacting.set(false);
Awaitility.await().untilAsserted(() -> assertTrue(closeFlag.get()));
assertTrue(closeEscapedMills.get() >= sleepMills);
}
}
Loading