Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(merkle tree): Expose Merkle tree API #209

Merged
merged 16 commits into from
Oct 17, 2023
Merged
8 changes: 8 additions & 0 deletions core/lib/config/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct MerkleTreeConfig {
/// Operation mode for the Merkle tree. If not specified, the full mode will be used.
#[serde(default)]
pub mode: MerkleTreeMode,
/// Port to bind the Merkle tree API server to. If not specified, the API server will not be launched.
#[serde(default)]
pub api_port: Option<u16>,
slowli marked this conversation as resolved.
Show resolved Hide resolved
/// Chunk size for multi-get operations. Can speed up loading data for the Merkle tree on some environments,
/// but the effects vary wildly depending on the setup (e.g., the filesystem used).
#[serde(default = "MerkleTreeConfig::default_multi_get_chunk_size")]
Expand All @@ -49,6 +52,7 @@ impl Default for MerkleTreeConfig {
path: Self::default_path(),
backup_path: Self::default_backup_path(),
mode: MerkleTreeMode::default(),
api_port: None,
multi_get_chunk_size: Self::default_multi_get_chunk_size(),
block_cache_size_mb: Self::default_block_cache_size_mb(),
max_l1_batches_per_iter: Self::default_max_l1_batches_per_iter(),
Expand Down Expand Up @@ -150,6 +154,7 @@ mod tests {
DATABASE_MERKLE_TREE_BACKUP_PATH="/db/backups"
DATABASE_MERKLE_TREE_PATH="/db/tree"
DATABASE_MERKLE_TREE_MODE=lightweight
DATABASE_MERKLE_TREE_API_PORT=3072
DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE=250
DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50
DATABASE_BACKUP_COUNT=5
Expand All @@ -162,6 +167,7 @@ mod tests {
assert_eq!(db_config.merkle_tree.path, "/db/tree");
assert_eq!(db_config.merkle_tree.backup_path, "/db/backups");
assert_eq!(db_config.merkle_tree.mode, MerkleTreeMode::Lightweight);
assert_eq!(db_config.merkle_tree.api_port, Some(3_072));
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 250);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 50);
assert_eq!(db_config.backup_count, 5);
Expand All @@ -176,6 +182,7 @@ mod tests {
"DATABASE_MERKLE_TREE_BACKUP_PATH",
"DATABASE_MERKLE_TREE_PATH",
"DATABASE_MERKLE_TREE_MODE",
"DATABASE_MERKLE_TREE_API_PORT",
"DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE",
"DATABASE_MERKLE_TREE_BLOCK_CACHE_SIZE_MB",
"DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER",
Expand All @@ -188,6 +195,7 @@ mod tests {
assert_eq!(db_config.merkle_tree.path, "./db/lightweight-new");
assert_eq!(db_config.merkle_tree.backup_path, "./db/backups");
assert_eq!(db_config.merkle_tree.mode, MerkleTreeMode::Full);
assert_eq!(db_config.merkle_tree.api_port, None);
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 500);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 20);
assert_eq!(db_config.merkle_tree.block_cache_size_mb, 128);
Expand Down
39 changes: 37 additions & 2 deletions core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use rayon::{ThreadPool, ThreadPoolBuilder};

use crate::{
storage::{MerkleTreeColumnFamily, PatchSet, Patched, RocksDBWrapper},
types::{Key, Root, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH},
BlockOutput, HashTree, MerkleTree,
types::{Key, Root, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH},
BlockOutput, HashTree, MerkleTree, NoVersionError,
};
use zksync_crypto::hasher::blake2::Blake2Hasher;
use zksync_storage::RocksDB;
Expand Down Expand Up @@ -98,6 +98,13 @@ impl ZkSyncTree {
}
}

/// Returns a readonly handle to the tree. The handle **does not** see uncommitted changes to the tree,
/// only ones flushed to RocksDB.
pub fn reader(&self) -> ZkSyncTreeReader {
let db = self.tree.db.inner().clone();
ZkSyncTreeReader(MerkleTree::new(db))
}

/// Sets the chunk size for multi-get operations. The requested keys will be split
/// into chunks of this size and requested in parallel using `rayon`. Setting chunk size
/// to a large value (e.g., `usize::MAX`) will effectively disable parallelism.
Expand Down Expand Up @@ -373,3 +380,31 @@ impl ZkSyncTree {
self.tree.db.reset();
}
}

/// Readonly handle to a [`ZkSyncTree`].
#[derive(Debug)]
pub struct ZkSyncTreeReader(MerkleTree<'static, RocksDBWrapper>);

// While cloning `MerkleTree` is logically unsound, cloning a reader is reasonable since it is readonly.
impl Clone for ZkSyncTreeReader {
fn clone(&self) -> Self {
Self(MerkleTree::new(self.0.db.clone()))
}
}

impl ZkSyncTreeReader {
/// Reads entries together with Merkle proofs with the specified keys from the tree. The entries are returned
/// in the same order as requested.
///
/// # Errors
///
/// Returns an error if the tree `version` is missing.
pub fn entries_with_proofs(
&self,
l1_batch_number: L1BatchNumber,
keys: &[Key],
) -> Result<Vec<TreeEntryWithProof>, NoVersionError> {
let version = u64::from(l1_batch_number.0);
self.0.entries_with_proofs(version, keys)
}
}
5 changes: 5 additions & 0 deletions core/lib/merkle_tree/src/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ impl<DB: Database> Patched<DB> {
.map_or_else(Vec::new, |patch| patch.roots.keys().copied().collect())
}

/// Provides readonly access to the wrapped DB.
pub(crate) fn inner(&self) -> &DB {
&self.inner
}

/// Provides access to the wrapped DB. Should not be used to mutate DB data.
pub(crate) fn inner_mut(&mut self) -> &mut DB {
&mut self.inner
Expand Down
39 changes: 27 additions & 12 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::upper_case_acronyms, clippy::derive_partial_eq_without_eq)]

use std::{str::FromStr, sync::Arc, time::Instant};
use std::{net::Ipv4Addr, str::FromStr, sync::Arc, time::Instant};

use anyhow::Context as _;
use futures::channel::oneshot;
Expand Down Expand Up @@ -733,20 +733,27 @@ async fn add_trees_to_task_futures(
},
(false, false) => return Ok(()),
};
let (future, tree_health_check) = run_tree(&db_config, &operation_config, mode, stop_receiver)
.await
.context("run_tree()")?;
task_futures.push(future);
healthchecks.push(Box::new(tree_health_check));
Ok(())

run_tree(
task_futures,
healthchecks,
&db_config,
&operation_config,
mode,
stop_receiver,
)
.await
.context("run_tree()")
}

async fn run_tree(
config: &DBConfig,
task_futures: &mut Vec<JoinHandle<anyhow::Result<()>>>,
healthchecks: &mut Vec<Box<dyn CheckHealth>>,
db_config: &DBConfig,
operation_manager: &OperationsManagerConfig,
mode: MetadataCalculatorModeConfig<'_>,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, ReactiveHealthCheck)> {
) -> anyhow::Result<()> {
let started_at = Instant::now();
let mode_str = if matches!(mode, MetadataCalculatorModeConfig::Full { .. }) {
"full"
Expand All @@ -755,9 +762,16 @@ async fn run_tree(
};
tracing::info!("Initializing Merkle tree in {mode_str} mode");

let config = MetadataCalculatorConfig::for_main_node(config, operation_manager, mode);
let config = MetadataCalculatorConfig::for_main_node(db_config, operation_manager, mode);
let metadata_calculator = MetadataCalculator::new(&config).await;
if let Some(port) = db_config.merkle_tree.api_port {
let address = (Ipv4Addr::UNSPECIFIED, port).into();
let server_task = metadata_calculator.run_api_server(&address, stop_receiver.clone());
task_futures.push(tokio::spawn(server_task));
}

let tree_health_check = metadata_calculator.tree_health_check();
healthchecks.push(Box::new(tree_health_check));
let pool = ConnectionPool::singleton(DbVariant::Master)
.build()
.await
Expand All @@ -766,7 +780,8 @@ async fn run_tree(
.build()
.await
.context("failed to build prover_pool")?;
let future = tokio::spawn(metadata_calculator.run(pool, prover_pool, stop_receiver));
let tree_task = tokio::spawn(metadata_calculator.run(pool, prover_pool, stop_receiver));
task_futures.push(tree_task);

tracing::info!("Initialized {mode_str} tree in {:?}", started_at.elapsed());
metrics::gauge!(
Expand All @@ -775,7 +790,7 @@ async fn run_tree(
"stage" => "tree",
"tree" => mode_str
);
Ok((future, tree_health_check))
Ok(())
}

async fn add_witness_generator_to_task_futures(
Expand Down
Loading