Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2871] Columnar rocksdb (#1599)
Browse files Browse the repository at this point in the history
* Columnated storage to allow for iteration over world state
* change MetricsCategory to PantheonMetricsCategory
* consistency renaming of kvstores
  • Loading branch information
RatanRSur authored Jun 27, 2019
1 parent 3dcc0c4 commit 69cee79
Show file tree
Hide file tree
Showing 26 changed files with 982 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import tech.pegasys.pantheon.ethereum.mainnet.TransactionProcessor;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator.TransactionInvalidReason;
import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import tech.pegasys.pantheon.ethereum.vm.TestBlockchain;
import tech.pegasys.pantheon.ethereum.worldstate.DefaultMutableWorldState;
import tech.pegasys.pantheon.metrics.MetricsSystem;
Expand Down Expand Up @@ -528,6 +528,6 @@ private TransactionReceipt createReceipt(

private DefaultMutableWorldState inMemoryWorldState() {
return new DefaultMutableWorldState(
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()));
new WorldStateKeyValueStorage(new InMemoryKeyValueStorage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import tech.pegasys.pantheon.ethereum.core.WorldUpdater;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import tech.pegasys.pantheon.ethereum.worldstate.DefaultMutableWorldState;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand Down Expand Up @@ -119,8 +119,7 @@ private static void writeAccountsTo(

private static Hash calculateGenesisStateHash(final List<GenesisAccount> genesisAccounts) {
final MutableWorldState worldState =
new DefaultMutableWorldState(
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()));
new DefaultMutableWorldState(new WorldStateKeyValueStorage(new InMemoryKeyValueStorage()));
writeAccountsTo(worldState, genesisAccounts);
return worldState.rootHash();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import java.util.List;
import java.util.Optional;

public class PrivateKeyValueStorage implements PrivateTransactionStorage {
public class PrivateTransactionKeyValueStorage implements PrivateTransactionStorage {

private final KeyValueStorage keyValueStorage;

private static final BytesValue EVENTS_KEY_SUFFIX = BytesValue.of("EVENTS".getBytes(UTF_8));
private static final BytesValue OUTPUT_KEY_SUFFIX = BytesValue.of("OUTPUT".getBytes(UTF_8));

public PrivateKeyValueStorage(final KeyValueStorage keyValueStorage) {
public PrivateTransactionKeyValueStorage(final KeyValueStorage keyValueStorage) {
this.keyValueStorage = keyValueStorage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import tech.pegasys.pantheon.ethereum.chain.BlockchainStorage;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import tech.pegasys.pantheon.ethereum.privacy.PrivateKeyValueStorage;
import tech.pegasys.pantheon.ethereum.privacy.PrivateStateKeyValueStorage;
import tech.pegasys.pantheon.ethereum.privacy.PrivateStateStorage;
import tech.pegasys.pantheon.ethereum.privacy.PrivateTransactionKeyValueStorage;
import tech.pegasys.pantheon.ethereum.privacy.PrivateTransactionStorage;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
Expand All @@ -27,35 +27,60 @@

public class KeyValueStorageProvider implements StorageProvider {

private final KeyValueStorage keyValueStorage;
private final KeyValueStorage blockchainStorage;
private final KeyValueStorage worldStateStorage;
private final KeyValueStorage privateTransactionStorage;
private final KeyValueStorage privateStateStorage;
private final KeyValueStorage pruningStorage;

public KeyValueStorageProvider(final KeyValueStorage keyValueStorage) {
this.keyValueStorage = keyValueStorage;
this(keyValueStorage, keyValueStorage, keyValueStorage, keyValueStorage, keyValueStorage);
}

public KeyValueStorageProvider(
final KeyValueStorage blockchainStorage,
final KeyValueStorage worldStateStorage,
final KeyValueStorage privateTransactionStorage,
final KeyValueStorage privateStateStorage,
final KeyValueStorage pruningStorage) {
this.blockchainStorage = blockchainStorage;
this.worldStateStorage = worldStateStorage;
this.privateTransactionStorage = privateTransactionStorage;
this.privateStateStorage = privateStateStorage;
this.pruningStorage = pruningStorage;
}

@Override
public BlockchainStorage createBlockchainStorage(final ProtocolSchedule<?> protocolSchedule) {
return new KeyValueStoragePrefixedKeyBlockchainStorage(
keyValueStorage, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
blockchainStorage, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
}

@Override
public WorldStateStorage createWorldStateStorage() {
return new KeyValueStorageWorldStateStorage(keyValueStorage);
return new WorldStateKeyValueStorage(worldStateStorage);
}

@Override
public PrivateTransactionStorage createPrivateTransactionStorage() {
return new PrivateKeyValueStorage(keyValueStorage);
return new PrivateTransactionKeyValueStorage(privateTransactionStorage);
}

@Override
public PrivateStateStorage createPrivateStateStorage() {
return new PrivateStateKeyValueStorage(keyValueStorage);
return new PrivateStateKeyValueStorage(privateStateStorage);
}

public KeyValueStorage createPruningStorage() {
return pruningStorage;
}

@Override
public void close() throws IOException {
keyValueStorage.close();
blockchainStorage.close();
worldStateStorage.close();
privateTransactionStorage.close();
privateStateStorage.close();
pruningStorage.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,83 @@
*/
package tech.pegasys.pantheon.ethereum.storage.keyvalue;

import static java.util.Arrays.asList;

import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.kvstore.ColumnarRocksDbKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration;
import tech.pegasys.pantheon.services.kvstore.RocksDbKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage.Segment;
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorageAdapter;

import java.io.IOException;
import java.nio.file.Files;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class RocksDbStorageProvider {
private static final Logger LOG = LogManager.getLogger();

public static StorageProvider create(
final RocksDbConfiguration rocksDbConfiguration, final MetricsSystem metricsSystem)
throws IOException {
if (rocksDbConfiguration.useColumns()) {
return createSegmentedProvider(rocksDbConfiguration, metricsSystem);
} else {
return createUnsegmentedProvider(rocksDbConfiguration, metricsSystem);
}
}

private static StorageProvider createUnsegmentedProvider(
final RocksDbConfiguration rocksDbConfiguration, final MetricsSystem metricsSystem)
throws IOException {
Files.createDirectories(rocksDbConfiguration.getDatabaseDir());
final KeyValueStorage kv = RocksDbKeyValueStorage.create(rocksDbConfiguration, metricsSystem);
return new KeyValueStorageProvider(kv);
}

private static StorageProvider createSegmentedProvider(
final RocksDbConfiguration rocksDbConfiguration, final MetricsSystem metricsSystem)
throws IOException {
LOG.info("Using RocksDB columns");
Files.createDirectories(rocksDbConfiguration.getDatabaseDir());
final SegmentedKeyValueStorage<?> columnarStorage =
ColumnarRocksDbKeyValueStorage.create(
rocksDbConfiguration, asList(RocksDbSegment.values()), metricsSystem);

return new KeyValueStorageProvider(
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.BLOCKCHAIN, columnarStorage),
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.WORLD_STATE, columnarStorage),
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRIVATE_TRANSACTIONS, columnarStorage),
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRIVATE_STATE, columnarStorage),
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRUNING_STATE, columnarStorage));
}

private enum RocksDbSegment implements Segment {
BLOCKCHAIN((byte) 1),
WORLD_STATE((byte) 2),
PRIVATE_TRANSACTIONS((byte) 3),
PRIVATE_STATE((byte) 4),
PRUNING_STATE((byte) 5);

private final byte[] id;

RocksDbSegment(final byte... id) {
this.id = id;
}

@Override
public String getName() {
return name();
}

@Override
public byte[] getId() {
return id;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;

public class KeyValueStorageWorldStateStorage implements WorldStateStorage {
public class WorldStateKeyValueStorage implements WorldStateStorage {

private final Subscribers<NodesAddedListener> nodeAddedListeners = Subscribers.create();
private final KeyValueStorage keyValueStorage;

public KeyValueStorageWorldStateStorage(final KeyValueStorage keyValueStorage) {
public WorldStateKeyValueStorage(final KeyValueStorage keyValueStorage) {
this.keyValueStorage = keyValueStorage;
}

Expand Down Expand Up @@ -74,15 +79,35 @@ public boolean isWorldStateAvailable(final Bytes32 rootHash) {

@Override
public Updater updater() {
return new Updater(keyValueStorage.startTransaction());
return new Updater(keyValueStorage.startTransaction(), nodeAddedListeners);
}

@Override
public long prune(final Predicate<BytesValue> inUseCheck) {
return keyValueStorage.removeUnless(inUseCheck);
}

@Override
public long addNodeAddedListener(final NodesAddedListener listener) {
return nodeAddedListeners.subscribe(listener);
}

@Override
public void removeNodeAddedListener(final long id) {
nodeAddedListeners.unsubscribe(id);
}

public static class Updater implements WorldStateStorage.Updater {

private final KeyValueStorage.Transaction transaction;
private final Subscribers<NodesAddedListener> nodeAddedListeners;
private final List<Bytes32> addedNodes = new ArrayList<>();

public Updater(final KeyValueStorage.Transaction transaction) {
public Updater(
final KeyValueStorage.Transaction transaction,
final Subscribers<NodesAddedListener> nodeAddedListeners) {
this.transaction = transaction;
this.nodeAddedListeners = nodeAddedListeners;
}

@Override
Expand All @@ -91,6 +116,8 @@ public Updater putCode(final Bytes32 codeHash, final BytesValue code) {
// Don't save empty values
return this;
}

addedNodes.add(codeHash);
transaction.put(codeHash, code);
return this;
}
Expand All @@ -101,6 +128,7 @@ public Updater putAccountStateTrieNode(final Bytes32 nodeHash, final BytesValue
// Don't save empty nodes
return this;
}
addedNodes.add(nodeHash);
transaction.put(nodeHash, node);
return this;
}
Expand All @@ -111,12 +139,14 @@ public Updater putAccountStorageTrieNode(final Bytes32 nodeHash, final BytesValu
// Don't save empty nodes
return this;
}
addedNodes.add(nodeHash);
transaction.put(nodeHash, node);
return this;
}

@Override
public void commit() {
nodeAddedListeners.forEach(listener -> listener.onNodesAdded(addedNodes));
transaction.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.core.WorldState;
import tech.pegasys.pantheon.ethereum.core.WorldUpdater;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;

import java.util.Collection;
Expand Down Expand Up @@ -50,7 +50,7 @@ private void addAll(final DebugInfo other) {
private final DebugInfo info = new DebugInfo();

public DebuggableMutableWorldState() {
super(new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()));
super(new WorldStateKeyValueStorage(new InMemoryKeyValueStorage()));
}

public DebuggableMutableWorldState(final WorldState worldState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.Optional;
import java.util.function.Predicate;

public interface WorldStateStorage {

Expand All @@ -36,6 +38,12 @@ default boolean contains(final Bytes32 hash) {

Updater updater();

long prune(Predicate<BytesValue> inUseCheck);

long addNodeAddedListener(NodesAddedListener listener);

void removeNodeAddedListener(long id);

interface Updater {

Updater putCode(Bytes32 nodeHash, BytesValue code);
Expand All @@ -54,4 +62,8 @@ default Updater putCode(final BytesValue code) {

void rollback();
}

interface NodesAddedListener {
void onNodesAdded(Collection<Bytes32> nodeHash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolScheduleBuilder;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
Expand Down Expand Up @@ -53,8 +53,7 @@ private ExecutionContextTestFixture(
new KeyValueStoragePrefixedKeyBlockchainStorage(
keyValueStorage, new MainnetBlockHeaderFunctions()),
new NoOpMetricsSystem());
this.stateArchive =
new WorldStateArchive(new KeyValueStorageWorldStateStorage(keyValueStorage));
this.stateArchive = new WorldStateArchive(new WorldStateKeyValueStorage(keyValueStorage));
this.protocolSchedule = protocolSchedule;
this.protocolContext = new ProtocolContext<>(blockchain, stateArchive, null);
genesisState.writeStateTo(stateArchive.getMutable());
Expand Down
Loading

0 comments on commit 69cee79

Please sign in to comment.