Skip to content

Commit

Permalink
Support reference-counting state backend. (paritytech#5769)
Browse files Browse the repository at this point in the history
* Optimize pinning

* Ref counting state backend

* Style

Co-Authored-By: Wei Tang <hi@that.world>

* Update Cargo.lock

* Handle empty node

Co-authored-by: Wei Tang <hi@that.world>
  • Loading branch information
arkpar and sorpaas authored Apr 27, 2020
1 parent a81dddc commit a516cf2
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 189 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ sp-trie = { version = "2.0.0-dev", path = "../../primitives/trie" }
sp-consensus = { version = "0.8.0-dev", path = "../../primitives/consensus/common" }
sp-blockchain = { version = "2.0.0-dev", path = "../../primitives/blockchain" }
sp-database = { version = "2.0.0-dev", path = "../../primitives/database" }
parity-db = { version = "0.1", optional = true }
parity-db = { version = "0.1.2", optional = true }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.8.0-dev", path = "../../utils/prometheus" }

[dev-dependencies]
Expand Down
54 changes: 43 additions & 11 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ pub type DbState<B> = sp_state_machine::TrieBackend<
Arc<dyn sp_state_machine::Storage<HashFor<B>>>, HashFor<B>
>;

const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
pub type DbHash = [u8; 32];
pub type DbHash = [u8; DB_HASH_LEN];

/// A reference tracking state.
///
Expand Down Expand Up @@ -314,6 +315,13 @@ impl DatabaseSettingsSrc {
DatabaseSettingsSrc::Custom(_) => None,
}
}
/// Check if database supports internal ref counting for state data.
pub fn supports_ref_counting(&self) -> bool {
match self {
DatabaseSettingsSrc::ParityDb { .. } => true,
_ => false,
}
}
}

/// Create an instance of db-backed client.
Expand Down Expand Up @@ -716,13 +724,18 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc
struct StorageDb<Block: BlockT> {
pub db: Arc<dyn Database<DbHash>>,
pub state_db: StateDb<Block::Hash, Vec<u8>>,
prefix_keys: bool,
}

impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for StorageDb<Block> {
fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
let key = prefixed_key::<HashFor<Block>>(key, prefix);
self.state_db.get(&key, self)
.map_err(|e| format!("Database backend error: {:?}", e))
if self.prefix_keys {
let key = prefixed_key::<HashFor<Block>>(key, prefix);
self.state_db.get(&key, self)
} else {
self.state_db.get(key.as_ref(), self)
}
.map_err(|e| format!("Database backend error: {:?}", e))
}
}

Expand Down Expand Up @@ -843,11 +856,15 @@ impl<Block: BlockT> Backend<Block> {
let map_e = |e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(
format!("State database error: {:?}", e)
);
let state_db: StateDb<_, _> = StateDb::new(config.pruning.clone(), &StateMetaDb(&*db))
.map_err(map_e)?;
let state_db: StateDb<_, _> = StateDb::new(
config.pruning.clone(),
!config.source.supports_ref_counting(),
&StateMetaDb(&*db),
).map_err(map_e)?;
let storage_db = StorageDb {
db: db.clone(),
state_db,
prefix_keys: !config.source.supports_ref_counting(),
};
let offchain_storage = offchain::LocalStorage::new(db.clone());
let changes_tries_storage = DbChangesTrieStorage::new(
Expand Down Expand Up @@ -1112,17 +1129,32 @@ impl<Block: BlockT> Backend<Block> {
let mut bytes: u64 = 0;
let mut removal: u64 = 0;
let mut bytes_removal: u64 = 0;
for (key, (val, rc)) in operation.db_updates.drain() {
for (mut key, (val, rc)) in operation.db_updates.drain() {
if !self.storage.prefix_keys {
// Strip prefix
key.drain(0 .. key.len() - DB_HASH_LEN);
};
if rc > 0 {
ops += 1;
bytes += key.len() as u64 + val.len() as u64;

changeset.inserted.push((key, val.to_vec()));
if rc == 1 {
changeset.inserted.push((key, val.to_vec()));
} else {
changeset.inserted.push((key.clone(), val.to_vec()));
for _ in 0 .. rc - 1 {
changeset.inserted.push((key.clone(), Default::default()));
}
}
} else if rc < 0 {
removal += 1;
bytes_removal += key.len() as u64;

changeset.deleted.push(key);
if rc == -1 {
changeset.deleted.push(key);
} else {
for _ in 0 .. -rc {
changeset.deleted.push(key.clone());
}
}
}
}
self.state_usage.tally_writes_nodes(ops, bytes);
Expand Down
11 changes: 9 additions & 2 deletions client/db/src/parity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
/// A `Database` adapter for parity-db.
use sp_database::{Database, Change, Transaction, ColumnId};
use crate::utils::NUM_COLUMNS;
use crate::columns;

struct DbAdapter(parity_db::Db);

Expand All @@ -30,8 +32,13 @@ fn handle_err<T>(result: parity_db::Result<T>) -> T {
}

/// Wrap RocksDb database into a trait object that implements `sp_database::Database`
pub fn open<H: Clone>(path: &std::path::Path, num_columns: u32) -> parity_db::Result<std::sync::Arc<dyn Database<H>>> {
let db = parity_db::Db::with_columns(path, num_columns as u8)?;
pub fn open<H: Clone>(path: &std::path::Path) -> parity_db::Result<std::sync::Arc<dyn Database<H>>> {
let mut config = parity_db::Options::with_columns(path, NUM_COLUMNS as u8);
let mut state_col = &mut config.columns[columns::STATE as usize];
state_col.ref_counted = true;
state_col.preimage = true;
state_col.uniform = true;
let db = parity_db::Db::open(&config)?;
Ok(std::sync::Arc::new(DbAdapter(db)))
}

Expand Down
2 changes: 1 addition & 1 deletion client/db/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ pub fn open_database<Block: BlockT>(
},
#[cfg(feature = "parity-db")]
DatabaseSettingsSrc::ParityDb { path } => {
crate::parity_db::open(&path, NUM_COLUMNS)
crate::parity_db::open(&path)
.map_err(|e| sp_blockchain::Error::Backend(format!("{:?}", e)))?
},
DatabaseSettingsSrc::Custom(db) => db.clone(),
Expand Down
28 changes: 18 additions & 10 deletions client/state-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ struct StateDbSync<BlockHash: Hash, Key: Hash> {
impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<BlockHash, Key> {
fn new<D: MetaDb>(
mode: PruningMode,
ref_counting: bool,
db: &D,
) -> Result<StateDbSync<BlockHash, Key>, Error<D::Error>> {
trace!(target: "state-db", "StateDb settings: {:?}", mode);
trace!(target: "state-db", "StateDb settings: {:?}. Ref-counting: {}", mode, ref_counting);

// Check that settings match
Self::check_meta(&mode, db)?;
Expand All @@ -214,7 +215,7 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<Block
max_mem: Some(_),
..
}) => unimplemented!(),
PruningMode::Constrained(_) => Some(RefWindow::new(db)?),
PruningMode::Constrained(_) => Some(RefWindow::new(db, ref_counting)?),
PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => None,
};

Expand Down Expand Up @@ -387,8 +388,11 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<Block
}
}

pub fn get<D: NodeDb>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where Key: AsRef<D::Key>
pub fn get<D: NodeDb, Q: ?Sized>(&self, key: &Q, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where
Q: AsRef<D::Key>,
Key: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
if let Some(value) = self.non_canonical.get(key) {
return Ok(Some(value));
Expand Down Expand Up @@ -438,10 +442,11 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDb<BlockHash
/// Creates a new instance. Does not expect any metadata in the database.
pub fn new<D: MetaDb>(
mode: PruningMode,
ref_counting: bool,
db: &D,
) -> Result<StateDb<BlockHash, Key>, Error<D::Error>> {
Ok(StateDb {
db: RwLock::new(StateDbSync::new(mode, db)?)
db: RwLock::new(StateDbSync::new(mode, ref_counting, db)?)
})
}

Expand Down Expand Up @@ -475,8 +480,11 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDb<BlockHash
}

/// Get a value from non-canonical/pruning overlay or the backing DB.
pub fn get<D: NodeDb>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where Key: AsRef<D::Key>
pub fn get<D: NodeDb, Q: ?Sized>(&self, key: &Q, db: &D) -> Result<Option<DBValue>, Error<D::Error>>
where
Q: AsRef<D::Key>,
Key: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
self.db.read().get(key, db)
}
Expand Down Expand Up @@ -523,7 +531,7 @@ mod tests {

fn make_test_db(settings: PruningMode) -> (TestDb, StateDb<H256, H256>) {
let mut db = make_db(&[91, 921, 922, 93, 94]);
let state_db = StateDb::new(settings, &db).unwrap();
let state_db = StateDb::new(settings, false, &db).unwrap();

db.commit(
&state_db
Expand Down Expand Up @@ -638,7 +646,7 @@ mod tests {
#[test]
fn detects_incompatible_mode() {
let mut db = make_db(&[]);
let state_db = StateDb::new(PruningMode::ArchiveAll, &db).unwrap();
let state_db = StateDb::new(PruningMode::ArchiveAll, false, &db).unwrap();
db.commit(
&state_db
.insert_block::<io::Error>(
Expand All @@ -650,7 +658,7 @@ mod tests {
.unwrap(),
);
let new_mode = PruningMode::Constrained(Constraints { max_blocks: Some(2), max_mem: None });
let state_db: Result<StateDb<H256, H256>, _> = StateDb::new(new_mode, &db);
let state_db: Result<StateDb<H256, H256>, _> = StateDb::new(new_mode, false, &db);
assert!(state_db.is_err());
}
}
Loading

0 comments on commit a516cf2

Please sign in to comment.