Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safe rocksdb segment handles #3734

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.collect.ImmutableMap;
import org.apache.tuweni.bytes.Bytes;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyDescriptor;
Expand All @@ -62,19 +63,19 @@
public class RocksDBColumnarKeyValueStorage
implements SegmentedKeyValueStorage<ColumnFamilyHandle> {

static {
RocksDbUtil.loadNativeLibrary();
}

private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class);
private static final String DEFAULT_COLUMN = "default";
private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";

static {
RocksDbUtil.loadNativeLibrary();
}

private final DBOptions options;
private final TransactionDBOptions txOptions;
private final TransactionDB db;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<String, ColumnFamilyHandle> columnHandlesByName;
private final Map<String, AtomicReference<ColumnFamilyHandle>> columnHandlesByName;
private final RocksDBMetrics metrics;
private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true);

Expand Down Expand Up @@ -127,14 +128,17 @@ public RocksDBColumnarKeyValueStorage(
Collectors.toMap(
segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName));

columnHandlesByName = new HashMap<>();
final ImmutableMap.Builder<String, AtomicReference<ColumnFamilyHandle>> builder =
ImmutableMap.builder();

for (ColumnFamilyHandle columnHandle : columnHandles) {
final String segmentName =
requireNonNullElse(
segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN);
columnHandlesByName.put(segmentName, columnHandle);
builder.put(segmentName, new AtomicReference<>(columnHandle));
}
columnHandlesByName = builder.build();

} catch (final RocksDBException e) {
throw new StorageException(e);
}
Expand All @@ -146,7 +150,8 @@ private BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfigura
}

@Override
public ColumnFamilyHandle getSegmentIdentifierByName(final SegmentIdentifier segment) {
public AtomicReference<ColumnFamilyHandle> getSegmentIdentifierByName(
final SegmentIdentifier segment) {
return columnHandlesByName.get(segment.getName());
}

Expand Down Expand Up @@ -198,30 +203,27 @@ public Set<byte[]> getAllKeysThat(
}

@Override
public ColumnFamilyHandle clear(final ColumnFamilyHandle segmentHandle) {
try {

var entry =
columnHandlesByName.entrySet().stream()
.filter(e -> e.getValue().equals(segmentHandle))
.findAny();

if (entry.isPresent()) {
String segmentName = entry.get().getKey();
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptor(
segmentHandle.getName(), segmentHandle.getDescriptor().getOptions());
db.dropColumnFamily(segmentHandle);
segmentHandle.close();
ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor);
columnHandlesByName.put(segmentName, newHandle);
return newHandle;
}

return segmentHandle;

} catch (final RocksDBException e) {
throw new StorageException(e);
public void clear(final ColumnFamilyHandle segmentHandle) {

var entry =
columnHandlesByName.values().stream().filter(e -> e.get().equals(segmentHandle)).findAny();

if (entry.isPresent()) {
AtomicReference<ColumnFamilyHandle> segmentHandleRef = entry.get();
segmentHandleRef.getAndUpdate(
oldHandle -> {
try {
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptor(
segmentHandle.getName(), segmentHandle.getDescriptor().getOptions());
db.dropColumnFamily(oldHandle);
ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor);
segmentHandle.close();
return newHandle;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
});
}
}

Expand All @@ -231,7 +233,9 @@ public void close() {
txOptions.close();
options.close();
tryDeleteOptions.close();
columnHandlesByName.values().forEach(ColumnFamilyHandle::close);
columnHandlesByName.values().stream()
.map(AtomicReference::get)
.forEach(ColumnFamilyHandle::close);
db.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.junit.Rule;
import org.junit.Test;
Expand All @@ -49,7 +49,10 @@ public void assertClear() throws Exception {
final byte[] val1 = bytesFromHexString("0FFF");
final byte[] val2 = bytesFromHexString("1337");
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
Supplier<ColumnFamilyHandle> segment = () -> store.getSegmentIdentifierByName(TestSegment.FOO);
AtomicReference<ColumnFamilyHandle> segment = store.getSegmentIdentifierByName(TestSegment.FOO);
KeyValueStorage duplicateSegmentRef =
new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store);

final Consumer<byte[]> insert =
value -> {
final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
Expand All @@ -59,18 +62,18 @@ public void assertClear() throws Exception {

// insert val:
insert.accept(val1);
final Optional<byte[]> result = store.get(segment.get(), key);
assertThat(result.orElse(null)).isEqualTo(val1);
assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val1);
assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val1);

// clear and assert empty:
store.clear(segment.get());
final Optional<byte[]> truncResult = store.get(segment.get(), key);
assertThat(truncResult).isEmpty();
assertThat(store.get(segment.get(), key)).isEmpty();
assertThat(duplicateSegmentRef.get(key)).isEmpty();

// insert into empty:
insert.accept(val2);
final Optional<byte[]> nextResult = store.get(segment.get(), key);
assertThat(nextResult.orElse(null)).isEqualTo(val2);
assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val2);
assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val2);

store.close();
}
Expand All @@ -81,13 +84,14 @@ public void twoSegmentsAreIndependent() throws Exception {

final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(
store.getSegmentIdentifierByName(TestSegment.BAR),
store.getSegmentIdentifierByName(TestSegment.BAR).get(),
bytesFromHexString("0001"),
bytesFromHexString("0FFF"));
tx.commit();

final Optional<byte[]> result =
store.get(store.getSegmentIdentifierByName(TestSegment.FOO), bytesFromHexString("0001"));
store.get(
store.getSegmentIdentifierByName(TestSegment.FOO).get(), bytesFromHexString("0001"));

assertThat(result).isEmpty();

Expand All @@ -100,8 +104,8 @@ public void canRemoveThroughSegmentIteration() throws Exception {
// properly
for (int i = 0; i < 50; i++) {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO);
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR);
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get();
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get();

final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(fooSegment, bytesOf(1), bytesOf(1));
Expand Down Expand Up @@ -144,8 +148,8 @@ public void canRemoveThroughSegmentIteration() throws Exception {
@Test
public void canGetThroughSegmentIteration() throws Exception {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO);
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR);
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get();
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get();

final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(fooSegment, bytesOf(1), bytesOf(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand All @@ -30,7 +31,7 @@
*/
public interface SegmentedKeyValueStorage<S> extends Closeable {

S getSegmentIdentifierByName(SegmentIdentifier segment);
AtomicReference<S> getSegmentIdentifierByName(SegmentIdentifier segment);

/**
* Get the value from the associated segment and key.
Expand Down Expand Up @@ -74,7 +75,7 @@ default boolean containsKey(final S segment, final byte[] key) throws StorageExc

Set<byte[]> getAllKeysThat(S segmentHandle, Predicate<byte[]> returnCondition);

S clear(S segmentHandle);
void clear(S segmentHandle);

/**
* Represents a set of changes to be committed atomically. A single transaction is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,48 @@
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;

public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
private S segmentHandle;
private final AtomicReference<S> segmentHandle;
private final SegmentedKeyValueStorage<S> storage;

public SegmentedKeyValueStorageAdapter(
final SegmentIdentifier segment, final SegmentedKeyValueStorage<S> storage) {
this.segmentHandle = storage.getSegmentIdentifierByName(segment);
segmentHandle = storage.getSegmentIdentifierByName(segment);
this.storage = storage;
}

@Override
public void clear() {
segmentHandle = storage.clear(segmentHandle);
storage.clear(segmentHandle.get());
}

@Override
public boolean containsKey(final byte[] key) throws StorageException {
return storage.containsKey(segmentHandle, key);
return storage.containsKey(segmentHandle.get(), key);
}

@Override
public Optional<byte[]> get(final byte[] key) throws StorageException {
return storage.get(segmentHandle, key);
return storage.get(segmentHandle.get(), key);
}

@Override
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
return storage.getAllKeysThat(segmentHandle, returnCondition);
return storage.getAllKeysThat(segmentHandle.get(), returnCondition);
}

@Override
public Stream<byte[]> streamKeys() {
return storage.streamKeys(segmentHandle);
return storage.streamKeys(segmentHandle.get());
}

@Override
public boolean tryDelete(final byte[] key) {
return storage.tryDelete(segmentHandle, key);
return storage.tryDelete(segmentHandle.get(), key);
}

@Override
Expand All @@ -77,12 +78,12 @@ public KeyValueStorageTransaction startTransaction() throws StorageException {

@Override
public void put(final byte[] key, final byte[] value) {
transaction.put(segmentHandle, key, value);
transaction.put(segmentHandle.get(), key, value);
}

@Override
public void remove(final byte[] key) {
transaction.remove(segmentHandle, key);
transaction.remove(segmentHandle.get(), key);
}

@Override
Expand Down