-
Notifications
You must be signed in to change notification settings - Fork 370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INDY-1205: Use RocksDB as a key-value storage #561
Changes from 13 commits
2fc293c
904751b
6a694a8
747de27
fd127f6
7f9b666
186f773
c9e7a19
b2359fa
4271498
8b6399f
6ed63c3
ebed50f
3e88988
a551c09
268398b
4945aba
b7319ef
b2190a8
677d669
814bf04
b45fe58
237ce3b
1b84c91
6f63c64
2cd538e
d9f34a1
39ae3d8
b976a51
3308804
4a2469a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from ledger.hash_stores.hash_store import HashStore | ||
from storage.kv_store_rocksdb import KeyValueStorageRocksdb | ||
from stp_core.common.log import getlogger | ||
|
||
|
||
logger = getlogger() | ||
|
||
|
||
class RocksDbHashStore(HashStore): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation looks exactly the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but I propose to merge hash stores implementations after rocksdb build procedure is ready. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
def __init__(self, dataDir, fileNamePrefix=""): | ||
self.dataDir = dataDir | ||
self.nodesDb = None | ||
self.leavesDb = None | ||
self.nodes_db_name = fileNamePrefix + '_merkleNodes' | ||
self.leaves_db_name = fileNamePrefix + '_merkleLeaves' | ||
self.open() | ||
|
||
def writeLeaf(self, leafHash): | ||
self.leavesDb.put(str(self.leafCount + 1), leafHash) | ||
self.leafCount += 1 | ||
|
||
def writeNode(self, node): | ||
start, height, nodeHash = node | ||
seqNo = self.getNodePosition(start, height) | ||
self.nodesDb.put(str(seqNo), nodeHash) | ||
|
||
def readLeaf(self, seqNo): | ||
return self._readOne(seqNo, self.leavesDb) | ||
|
||
def readNode(self, seqNo): | ||
return self._readOne(seqNo, self.nodesDb) | ||
|
||
def _readOne(self, pos, db): | ||
self._validatePos(pos) | ||
try: | ||
# Converting any bytearray to bytes | ||
return bytes(db.get(str(pos))) | ||
except KeyError: | ||
logger.error("{} does not have position {}".format(db, pos)) | ||
|
||
def readLeafs(self, start, end): | ||
return self._readMultiple(start, end, self.leavesDb) | ||
|
||
def readNodes(self, start, end): | ||
return self._readMultiple(start, end, self.nodesDb) | ||
|
||
def _readMultiple(self, start, end, db): | ||
""" | ||
Returns a list of hashes with serial numbers between start | ||
and end, both inclusive. | ||
""" | ||
self._validatePos(start, end) | ||
# Converting any bytearray to bytes | ||
return [bytes(db.get(str(pos))) for pos in range(start, end + 1)] | ||
|
||
@property | ||
def leafCount(self) -> int: | ||
return self.leavesDb.size | ||
|
||
@property | ||
def nodeCount(self) -> int: | ||
return self.nodesDb.size | ||
|
||
@leafCount.setter | ||
def leafCount(self, count: int) -> None: | ||
self._leafCount = count | ||
|
||
@property | ||
def closed(self): | ||
return self.nodesDb is None and self.leavesDb is None | ||
|
||
def open(self): | ||
self.nodesDb = KeyValueStorageRocksdb(self.dataDir, self.nodes_db_name) | ||
self.leavesDb = KeyValueStorageRocksdb( | ||
self.dataDir, self.leaves_db_name) | ||
|
||
def close(self): | ||
self.nodesDb.close() | ||
self.leavesDb.close() | ||
|
||
def reset(self) -> bool: | ||
self.nodesDb.reset() | ||
self.leavesDb.reset() | ||
self.leafCount = 0 | ||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ | |
from plenum.common.constants import POOL_LEDGER_ID, DOMAIN_LEDGER_ID, \ | ||
CLIENT_BLACKLISTER_SUFFIX, CONFIG_LEDGER_ID, \ | ||
NODE_BLACKLISTER_SUFFIX, NODE_PRIMARY_STORAGE_SUFFIX, HS_FILE, HS_LEVELDB, \ | ||
TXN_TYPE, LEDGER_STATUS, \ | ||
HS_ROCKSDB, TXN_TYPE, LEDGER_STATUS, \ | ||
CLIENT_STACK_SUFFIX, PRIMARY_SELECTION_PREFIX, VIEW_CHANGE_PREFIX, \ | ||
OP_FIELD_NAME, CATCH_UP_PREFIX, NYM, \ | ||
GET_TXN, DATA, TXN_TIME, VERKEY, \ | ||
|
@@ -59,8 +59,9 @@ | |
compare_3PC_keys, get_utc_epoch | ||
from plenum.common.verifier import DidVerifier | ||
from plenum.persistence.leveldb_hash_store import LevelDbHashStore | ||
from plenum.persistence.rocksdb_hash_store import RocksDbHashStore | ||
from plenum.persistence.req_id_to_txn import ReqIdrToTxn | ||
from plenum.persistence.storage import Storage, initStorage, initKeyValueStorage | ||
from plenum.persistence.storage import Storage, initStorage | ||
from plenum.server.blacklister import Blacklister | ||
from plenum.server.blacklister import SimpleBlacklister | ||
from plenum.server.client_authn import ClientAuthNr, SimpleAuthNr, CoreAuthNr | ||
|
@@ -91,6 +92,7 @@ | |
from plenum.common.config_helper import PNodeConfigHelper | ||
from state.pruning_state import PruningState | ||
from state.state import State | ||
from storage.helper import initKeyValueStorage, initHashStore | ||
from stp_core.common.log import getlogger | ||
from stp_core.crypto.signer import Signer | ||
from stp_core.network.exceptions import RemoteNotFound | ||
|
@@ -447,9 +449,7 @@ def setup_config_req_handler(self): | |
self.register_req_handler(CONFIG_LEDGER_ID, self.configReqHandler) | ||
|
||
def getConfigLedger(self): | ||
hashStore = LevelDbHashStore( | ||
dataDir=self.dataLocation, fileNamePrefix='config') | ||
return Ledger(CompactMerkleTree(hashStore=hashStore), | ||
return Ledger(CompactMerkleTree(hashStore=self.getHashStore('config')), | ||
dataDir=self.dataLocation, | ||
fileName=self.config.configTransactionsFile, | ||
ensureDurability=self.config.EnsureLedgerDurability) | ||
|
@@ -717,15 +717,7 @@ def getHashStore(self, name) -> HashStore: | |
""" | ||
Create and return a hashStore implementation based on configuration | ||
""" | ||
hsConfig = self.config.hashStore['type'].lower() | ||
if hsConfig == HS_FILE: | ||
return FileHashStore(dataDir=self.dataLocation, | ||
fileNamePrefix=name) | ||
elif hsConfig == HS_LEVELDB: | ||
return LevelDbHashStore(dataDir=self.dataLocation, | ||
fileNamePrefix=name) | ||
else: | ||
return MemoryHashStore() | ||
return initHashStore(self.dataLocation, name, self.config) | ||
|
||
def get_new_ledger_manager(self) -> LedgerManager: | ||
ledger_sync_order = self.ledger_ids | ||
|
@@ -940,7 +932,7 @@ def onStopping(self): | |
|
||
def closeAllKVStores(self): | ||
# Clear leveldb lock files | ||
logger.debug("{} closing level dbs".format(self), extra={"cli": False}) | ||
logger.debug("{} closing level/rocks dbs".format(self), extra={"cli": False}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A better comment: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
for ledgerId in self.ledgerManager.ledgerRegistry: | ||
state = self.getState(ledgerId) | ||
if state: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import pytest | ||
|
||
from ledger.compact_merkle_tree import CompactMerkleTree | ||
from ledger.ledger import Ledger | ||
from ledger.test.test_file_hash_store import nodesLeaves, \ | ||
generateHashes | ||
|
||
from plenum.persistence.rocksdb_hash_store import RocksDbHashStore | ||
|
||
|
||
@pytest.yield_fixture(scope="module") | ||
def rocksdbHashStore(tdir): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there the same tests as for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
hs = RocksDbHashStore(tdir) | ||
cleanup(hs) | ||
yield hs | ||
hs.close() | ||
|
||
|
||
def cleanup(hs): | ||
hs.reset() | ||
hs.leafCount = 0 | ||
|
||
|
||
def testIndexFrom1(rocksdbHashStore): | ||
with pytest.raises(IndexError): | ||
rocksdbHashStore.readLeaf(0) | ||
|
||
|
||
def testReadWrite(rocksdbHashStore, nodesLeaves): | ||
nodes, leaves = nodesLeaves | ||
for node in nodes: | ||
rocksdbHashStore.writeNode(node) | ||
for leaf in leaves: | ||
rocksdbHashStore.writeLeaf(leaf) | ||
onebyone = [rocksdbHashStore.readLeaf(i + 1) for i in range(10)] | ||
multiple = rocksdbHashStore.readLeafs(1, 10) | ||
assert onebyone == leaves | ||
assert onebyone == multiple | ||
|
||
|
||
def testRecoverLedgerFromHashStore(rocksdbHashStore, tdir): | ||
cleanup(rocksdbHashStore) | ||
tree = CompactMerkleTree(hashStore=rocksdbHashStore) | ||
ledger = Ledger(tree=tree, dataDir=tdir) | ||
for d in range(10): | ||
ledger.add(str(d).encode()) | ||
updatedTree = ledger.tree | ||
ledger.stop() | ||
|
||
tree = CompactMerkleTree(hashStore=rocksdbHashStore) | ||
restartedLedger = Ledger(tree=tree, dataDir=tdir) | ||
assert restartedLedger.size == ledger.size | ||
assert restartedLedger.root_hash == ledger.root_hash | ||
assert restartedLedger.tree.hashes == updatedTree.hashes | ||
assert restartedLedger.tree.root_hash == updatedTree.root_hash | ||
restartedLedger.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please use rocksdb here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, thanks)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seriously speaking, I've made temporary roll back to leveldb as rocksdb building procedure is not ready yet.