Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INDY-1205: Use RocksDB as a key-value storage #561

Merged
merged 31 commits into from
Mar 21, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2fc293c
INDY-1205: Add rocksdb as a key-value backend.
Mar 5, 2018
904751b
Add tests for rocksdb.
Mar 5, 2018
6a694a8
Add rocksdb support.
Mar 5, 2018
747de27
Use rocksdb as a backend storage.
Mar 5, 2018
fd127f6
Fix initialisation of rocksdb kvstore.
Mar 5, 2018
7f9b666
Fix test_kv_rocksdb.
Mar 5, 2018
186f773
Fix comparator of KeyValueStorageRocksdbIntKeys class.
Mar 5, 2018
c9e7a19
Fix test_state_rocksdb.
Mar 5, 2018
b2359fa
Add unified config-based creation of hash store.
Mar 6, 2018
4271498
Change default hash storage from file to rocksdb.
Mar 6, 2018
8b6399f
Integrate rocksdb into state tests.
Mar 6, 2018
6ed63c3
Merge kv storages tests into single module.
Mar 6, 2018
ebed50f
Temporary rollback to leveldb.
Mar 6, 2018
3e88988
Re-factor tests.
Mar 7, 2018
a551c09
Implement the first version of installation of rocksdb and python-roc…
Mar 7, 2018
268398b
Merge leveldb and rocksdb hash storages implementations into single s…
Mar 7, 2018
4945aba
Use RocksDB as a key-value storage.
Mar 7, 2018
b7319ef
Merge remote-tracking branch 'base/master' into feature/INDY-1205
Mar 7, 2018
b2190a8
Tempoprary use leveldb as a default storage for the ledger.
Mar 7, 2018
677d669
Adopt getAllTxn() for working with rocksdb iterator.
Mar 7, 2018
814bf04
Fix db_path property for leveldb and rocksdb, fix test.
Mar 12, 2018
b45fe58
Merge remote-tracking branch 'base/master' into feature/INDY-1205
Mar 12, 2018
237ce3b
Add build procedure for python-rocksdb and setuptool, use librocksdb …
Mar 14, 2018
1b84c91
Add missed libs to docker file.
Mar 14, 2018
6f63c64
Merge remote-tracking branch 'base/master' into feature/INDY-1205
Mar 19, 2018
2cd538e
Change rocksdb package.
Mar 19, 2018
d9f34a1
Change rocksdb package for 3d parties build.
Mar 20, 2018
39ae3d8
Implement get_equal_or_prev() functionality for KeyValueStorageRocksd…
Mar 20, 2018
b976a51
Add a helper for init of k/v storage with int keys.
Mar 20, 2018
3308804
Add rocksdb tests for the equal-or-prev functionality.
Mar 20, 2018
4a2469a
Fallback to leveldb as we do not want to migrate to rocksdb right now.
Mar 20, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ledger/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ledger.tree_hasher import TreeHasher
from ledger.util import F, ConsistencyVerificationFailed
from storage.kv_store import KeyValueStorage
from storage.kv_store_leveldb_int_keys import KeyValueStorageLeveldbIntKeys
from storage.kv_store_rocksdb_int_keys import KeyValueStorageRocksdbIntKeys


class Ledger(ImmutableStore):
Expand All @@ -19,7 +19,7 @@ def _defaultStore(dataDir,
logName,
ensureDurability,
open=True) -> KeyValueStorage:
return KeyValueStorageLeveldbIntKeys(dataDir, logName, open)
return KeyValueStorageRocksdbIntKeys(dataDir, logName, open)

def __init__(self,
tree: MerkleTree,
Expand Down
2 changes: 1 addition & 1 deletion plenum/bls/bls_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from common.serializers.serialization import multi_sig_store_serializer
from plenum.persistence.storage import initKeyValueStorage
from storage.helper import initKeyValueStorage
from crypto.bls.bls_multi_signature import MultiSignature
from typing import Optional

Expand Down
2 changes: 2 additions & 0 deletions plenum/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class StorageType(IntEnum):
class KeyValueStorageType(IntEnum):
Leveldb = 1
Memory = 2
Rocksdb = 3


@unique
Expand All @@ -164,6 +165,7 @@ class LedgerState(IntEnum):
HS_FILE = "file"
HS_MEMORY = "memory"
HS_LEVELDB = 'leveldb'
HS_ROCKSDB = 'rocksdb'

PLUGIN_BASE_DIR_PATH = "PluginBaseDirPath"
POOL_LEDGER_ID = 0
Expand Down
6 changes: 2 additions & 4 deletions plenum/common/stack_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from abc import abstractmethod, ABCMeta
from collections import OrderedDict
import os

from ledger.genesis_txn.genesis_txn_initiator_from_file import GenesisTxnInitiatorFromFile
from plenum.common.keygen_utils import initRemoteKeys
from plenum.common.tools import lazy_field
from plenum.persistence.leveldb_hash_store import LevelDbHashStore
from storage.helper import initHashStore
from stp_core.types import HA
from stp_core.network.exceptions import RemoteNotFound
from stp_core.common.log import getlogger
Expand Down Expand Up @@ -43,8 +42,7 @@ def ledgerFile(self) -> str:

@lazy_field
def hashStore(self):
return LevelDbHashStore(dataDir=self.ledgerLocation,
fileNamePrefix='pool')
return initHashStore(self.ledgerLocation, 'pool', self.config)

# noinspection PyTypeChecker
@lazy_field
Expand Down
5 changes: 3 additions & 2 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import logging

from plenum.common.constants import ClientBootStrategy, HS_FILE, KeyValueStorageType
from plenum.common.constants import ClientBootStrategy, HS_FILE, HS_LEVELDB, \
HS_ROCKSDB, HS_MEMORY, KeyValueStorageType
from plenum.common.types import PLUGIN_TYPE_STATS_CONSUMER

# Each entry in registry is (stack name, ((host, port), verkey, pubkey))
Expand Down Expand Up @@ -59,7 +60,7 @@
clientBootStrategy = ClientBootStrategy.PoolTxn

hashStore = {
"type": HS_FILE
"type": HS_LEVELDB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please use rocksdb here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, thanks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously speaking, I've made temporary roll back to leveldb as rocksdb building procedure is not ready yet.

}

primaryStorage = None
Expand Down
6 changes: 2 additions & 4 deletions plenum/persistence/client_txn_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from plenum.common.has_file_storage import HasFileStorage
from plenum.common.txn_util import getTxnOrderedFields
from plenum.common.util import updateFieldsWithSeqNo
from storage.kv_store_leveldb import KeyValueStorageLeveldb
from storage.kv_store_rocksdb import KeyValueStorageRocksdb


class ClientTxnLog(HasFileStorage):
Expand All @@ -17,9 +17,7 @@ def __init__(self, dataLocation):
self.clientDataLocation = self.dataLocation
if not os.path.exists(self.clientDataLocation):
os.makedirs(self.clientDataLocation)
# self.transactionLog = TextFileStore(self.clientDataLocation,
# "transactions")
self.transactionLog = KeyValueStorageLeveldb(
self.transactionLog = KeyValueStorageRocksdb(
self.clientDataLocation, "transactions")
self.serializer = ledger_txn_serializer

Expand Down
85 changes: 85 additions & 0 deletions plenum/persistence/rocksdb_hash_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from ledger.hash_stores.hash_store import HashStore
from storage.kv_store_rocksdb import KeyValueStorageRocksdb
from stp_core.common.log import getlogger


logger = getlogger()


class RocksDbHashStore(HashStore):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks exactly the same as LevelDbHashStore. Maybe use just one implementation with Dependency Injection of a proper key-value implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I propose to merge hash stores implementations after rocksdb build procedure is ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def __init__(self, dataDir, fileNamePrefix=""):
self.dataDir = dataDir
self.nodesDb = None
self.leavesDb = None
self.nodes_db_name = fileNamePrefix + '_merkleNodes'
self.leaves_db_name = fileNamePrefix + '_merkleLeaves'
self.open()

def writeLeaf(self, leafHash):
self.leavesDb.put(str(self.leafCount + 1), leafHash)
self.leafCount += 1

def writeNode(self, node):
start, height, nodeHash = node
seqNo = self.getNodePosition(start, height)
self.nodesDb.put(str(seqNo), nodeHash)

def readLeaf(self, seqNo):
return self._readOne(seqNo, self.leavesDb)

def readNode(self, seqNo):
return self._readOne(seqNo, self.nodesDb)

def _readOne(self, pos, db):
self._validatePos(pos)
try:
# Converting any bytearray to bytes
return bytes(db.get(str(pos)))
except KeyError:
logger.error("{} does not have position {}".format(db, pos))

def readLeafs(self, start, end):
return self._readMultiple(start, end, self.leavesDb)

def readNodes(self, start, end):
return self._readMultiple(start, end, self.nodesDb)

def _readMultiple(self, start, end, db):
"""
Returns a list of hashes with serial numbers between start
and end, both inclusive.
"""
self._validatePos(start, end)
# Converting any bytearray to bytes
return [bytes(db.get(str(pos))) for pos in range(start, end + 1)]

@property
def leafCount(self) -> int:
return self.leavesDb.size

@property
def nodeCount(self) -> int:
return self.nodesDb.size

@leafCount.setter
def leafCount(self, count: int) -> None:
self._leafCount = count

@property
def closed(self):
return self.nodesDb is None and self.leavesDb is None

def open(self):
self.nodesDb = KeyValueStorageRocksdb(self.dataDir, self.nodes_db_name)
self.leavesDb = KeyValueStorageRocksdb(
self.dataDir, self.leaves_db_name)

def close(self):
self.nodesDb.close()
self.leavesDb.close()

def reset(self) -> bool:
self.nodesDb.reset()
self.leavesDb.reset()
self.leafCount = 0
return True
17 changes: 2 additions & 15 deletions plenum/persistence/storage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from abc import abstractmethod, ABC

from plenum.common.constants import StorageType, KeyValueStorageType
from plenum.common.exceptions import DataDirectoryNotFound, KeyValueStorageConfigNotFound
from plenum.common.constants import StorageType
from plenum.common.exceptions import DataDirectoryNotFound
from plenum.common.messages.node_messages import Reply
from storage.kv_in_memory import KeyValueStorageInMemory
from storage.kv_store import KeyValueStorage
from storage.kv_store_leveldb import KeyValueStorageLeveldb
from storage.text_file_store import TextFileStore


Expand All @@ -27,16 +24,6 @@ async def get(self, identifier: str, reqId: int, **kwargs):
pass


def initKeyValueStorage(keyValueType, dataLocation,
keyValueStorageName) -> KeyValueStorage:
if keyValueType == KeyValueStorageType.Leveldb:
return KeyValueStorageLeveldb(dataLocation, keyValueStorageName)
elif keyValueType == KeyValueStorageType.Memory:
return KeyValueStorageInMemory()
else:
raise KeyValueStorageConfigNotFound


def initStorage(storageType, name, dataDir=None, config=None):
if storageType == StorageType.File:
if dataDir is None:
Expand Down
22 changes: 7 additions & 15 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from plenum.common.constants import POOL_LEDGER_ID, DOMAIN_LEDGER_ID, \
CLIENT_BLACKLISTER_SUFFIX, CONFIG_LEDGER_ID, \
NODE_BLACKLISTER_SUFFIX, NODE_PRIMARY_STORAGE_SUFFIX, HS_FILE, HS_LEVELDB, \
TXN_TYPE, LEDGER_STATUS, \
HS_ROCKSDB, TXN_TYPE, LEDGER_STATUS, \
CLIENT_STACK_SUFFIX, PRIMARY_SELECTION_PREFIX, VIEW_CHANGE_PREFIX, \
OP_FIELD_NAME, CATCH_UP_PREFIX, NYM, \
GET_TXN, DATA, TXN_TIME, VERKEY, \
Expand Down Expand Up @@ -59,8 +59,9 @@
compare_3PC_keys, get_utc_epoch
from plenum.common.verifier import DidVerifier
from plenum.persistence.leveldb_hash_store import LevelDbHashStore
from plenum.persistence.rocksdb_hash_store import RocksDbHashStore
from plenum.persistence.req_id_to_txn import ReqIdrToTxn
from plenum.persistence.storage import Storage, initStorage, initKeyValueStorage
from plenum.persistence.storage import Storage, initStorage
from plenum.server.blacklister import Blacklister
from plenum.server.blacklister import SimpleBlacklister
from plenum.server.client_authn import ClientAuthNr, SimpleAuthNr, CoreAuthNr
Expand Down Expand Up @@ -91,6 +92,7 @@
from plenum.common.config_helper import PNodeConfigHelper
from state.pruning_state import PruningState
from state.state import State
from storage.helper import initKeyValueStorage, initHashStore
from stp_core.common.log import getlogger
from stp_core.crypto.signer import Signer
from stp_core.network.exceptions import RemoteNotFound
Expand Down Expand Up @@ -447,9 +449,7 @@ def setup_config_req_handler(self):
self.register_req_handler(CONFIG_LEDGER_ID, self.configReqHandler)

def getConfigLedger(self):
hashStore = LevelDbHashStore(
dataDir=self.dataLocation, fileNamePrefix='config')
return Ledger(CompactMerkleTree(hashStore=hashStore),
return Ledger(CompactMerkleTree(hashStore=self.getHashStore('config')),
dataDir=self.dataLocation,
fileName=self.config.configTransactionsFile,
ensureDurability=self.config.EnsureLedgerDurability)
Expand Down Expand Up @@ -717,15 +717,7 @@ def getHashStore(self, name) -> HashStore:
"""
Create and return a hashStore implementation based on configuration
"""
hsConfig = self.config.hashStore['type'].lower()
if hsConfig == HS_FILE:
return FileHashStore(dataDir=self.dataLocation,
fileNamePrefix=name)
elif hsConfig == HS_LEVELDB:
return LevelDbHashStore(dataDir=self.dataLocation,
fileNamePrefix=name)
else:
return MemoryHashStore()
return initHashStore(self.dataLocation, name, self.config)

def get_new_ledger_manager(self) -> LedgerManager:
ledger_sync_order = self.ledger_ids
Expand Down Expand Up @@ -940,7 +932,7 @@ def onStopping(self):

def closeAllKVStores(self):
# Clear leveldb lock files
logger.debug("{} closing level dbs".format(self), extra={"cli": False})
logger.debug("{} closing level/rocks dbs".format(self), extra={"cli": False})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better comment: closing key-value storages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for ledgerId in self.ledgerManager.ledgerRegistry:
state = self.getState(ledgerId)
if state:
Expand Down
2 changes: 1 addition & 1 deletion plenum/server/pool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from plenum.common.exceptions import UnsupportedOperation
from plenum.common.stack_manager import TxnStackManager
from plenum.common.types import NodeDetail
from plenum.persistence.storage import initKeyValueStorage
from storage.helper import initKeyValueStorage
from plenum.persistence.util import pop_merkle_info
from plenum.server.pool_req_handler import PoolRequestHandler
from state.pruning_state import PruningState
Expand Down
2 changes: 1 addition & 1 deletion plenum/test/plugin/demo_plugin/storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from ledger.compact_merkle_tree import CompactMerkleTree
from plenum.common.ledger import Ledger
from plenum.persistence.leveldb_hash_store import LevelDbHashStore
from plenum.persistence.storage import initKeyValueStorage
from state.pruning_state import PruningState
from storage.helper import initKeyValueStorage


def get_auction_hash_store(data_dir):
Expand Down
56 changes: 56 additions & 0 deletions plenum/test/storage/test_rocksdb_hash_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest

from ledger.compact_merkle_tree import CompactMerkleTree
from ledger.ledger import Ledger
from ledger.test.test_file_hash_store import nodesLeaves, \
generateHashes

from plenum.persistence.rocksdb_hash_store import RocksDbHashStore


@pytest.yield_fixture(scope="module")
def rocksdbHashStore(tdir):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there the same tests as for LevelDbHashStore? maybe just use the same test with parametrized fixtures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

hs = RocksDbHashStore(tdir)
cleanup(hs)
yield hs
hs.close()


def cleanup(hs):
hs.reset()
hs.leafCount = 0


def testIndexFrom1(rocksdbHashStore):
with pytest.raises(IndexError):
rocksdbHashStore.readLeaf(0)


def testReadWrite(rocksdbHashStore, nodesLeaves):
nodes, leaves = nodesLeaves
for node in nodes:
rocksdbHashStore.writeNode(node)
for leaf in leaves:
rocksdbHashStore.writeLeaf(leaf)
onebyone = [rocksdbHashStore.readLeaf(i + 1) for i in range(10)]
multiple = rocksdbHashStore.readLeafs(1, 10)
assert onebyone == leaves
assert onebyone == multiple


def testRecoverLedgerFromHashStore(rocksdbHashStore, tdir):
cleanup(rocksdbHashStore)
tree = CompactMerkleTree(hashStore=rocksdbHashStore)
ledger = Ledger(tree=tree, dataDir=tdir)
for d in range(10):
ledger.add(str(d).encode())
updatedTree = ledger.tree
ledger.stop()

tree = CompactMerkleTree(hashStore=rocksdbHashStore)
restartedLedger = Ledger(tree=tree, dataDir=tdir)
assert restartedLedger.size == ledger.size
assert restartedLedger.root_hash == ledger.root_hash
assert restartedLedger.tree.hashes == updatedTree.hashes
assert restartedLedger.tree.root_hash == updatedTree.root_hash
restartedLedger.stop()
10 changes: 7 additions & 3 deletions state/test/test_pruning_state.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import copy

import pytest
from storage.kv_store import KeyValueStorage
from state.pruning_state import PruningState
from state.state import State
from state.trie.pruning_trie import BLANK_NODE, BLANK_ROOT
from storage.kv_in_memory import KeyValueStorageInMemory
from storage.kv_store_leveldb import KeyValueStorageLeveldb
from storage.kv_store_rocksdb import KeyValueStorageRocksdb

i = 0


@pytest.yield_fixture(scope="function", params=['leveldb', 'in_memory'])
def db(request, tempdir) -> State:
if request == 'leveldb':
@pytest.yield_fixture(scope="function", params=['rocksdb', 'leveldb', 'in_memory'])
def db(request, tempdir) -> KeyValueStorage:
if request.param == 'leveldb':
return KeyValueStorageLeveldb(tempdir, 'kv{}'.format(i))
if request.param == 'rocksdb':
return KeyValueStorageRocksdb(tempdir, 'kv{}'.format(i))
return KeyValueStorageInMemory()


Expand Down
Loading