Skip to content

Commit

Permalink
refactor(cli): debug cmd for mempool under feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo authored and Rexagon committed Sep 16, 2024
1 parent 72512a8 commit d168ba6
Show file tree
Hide file tree
Showing 24 changed files with 462 additions and 472 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ clap = { workspace = true }
everscale-crypto = { workspace = true }
everscale-types = { workspace = true }
futures-util = { workspace = true }
getip = { workspace = true }
hex = { workspace = true }
libc = { workspace = true }
metrics = { workspace = true, optional = true }
Expand All @@ -44,7 +43,7 @@ tikv-jemallocator = { workspace = true, features = [
"profiling",
], optional = true }
tikv-jemalloc-ctl = { workspace = true, optional = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "fs"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

Expand All @@ -57,6 +56,7 @@ tycho-network = { workspace = true }
tycho-rpc = { workspace = true }
tycho-storage = { workspace = true }
tycho-util = { workspace = true, features = ["cli"] }
tycho-consensus = { workspace = true, features = ["test"], optional = true }

[dev-dependencies]
tycho-collator = { workspace = true, features = ["test"] }
Expand All @@ -70,6 +70,7 @@ rustc_version = { workspace = true }
default = ["jemalloc"]
jemalloc = ["dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl", "dep:metrics"]
deadlock-detection = ["parking_lot/deadlock_detection"]
debug = ["dep:tycho-consensus"]

[lints]
workspace = true
Expand Down
210 changes: 210 additions & 0 deletions cli/src/debug_cmd/mempool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use clap::Parser;
use everscale_crypto::ed25519;
use tokio::sync::mpsc;
use tycho_consensus::prelude::{Engine, InputBuffer};
use tycho_consensus::test_utils::AnchorConsumer;
use tycho_core::global_config::GlobalConfig;
use tycho_network::{DhtClient, OverlayService, PeerId, PeerResolver};
use tycho_storage::Storage;
use tycho_util::cli::error::ResultExt;
use tycho_util::cli::logger::{init_logger, set_abort_with_tracing};
use tycho_util::cli::{resolve_public_ip, signal};

use crate::node::config::{NodeConfig, NodeKeys};

/// run a node
#[derive(Parser)]
pub struct CmdRun {
/// path to the node config
#[clap(long)]
config: PathBuf,

/// path to the global config
#[clap(long)]
global_config: PathBuf,

/// path to the node keys
#[clap(long)]
keys: PathBuf,

/// path to the logger config
#[clap(long)]
logger_config: Option<PathBuf>,

/// Round of a new consensus genesis
#[clap(long)]
mempool_start_round: Option<u32>,
}

impl CmdRun {
pub fn run(self) -> Result<()> {
let node_config =
NodeConfig::from_file(&self.config).wrap_err("failed to load node config")?;

rayon::ThreadPoolBuilder::new()
.stack_size(8 * 1024 * 1024)
.thread_name(|_| "rayon_worker".to_string())
.num_threads(node_config.threads.rayon_threads)
.build_global()
.unwrap();

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(node_config.threads.tokio_workers)
.build()?
.block_on(async move {
let run_fut = tokio::spawn(self.run_impl(node_config));
let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS);
tokio::select! {
res = run_fut => res.unwrap(),
signal = stop_fut => match signal {
Ok(signal) => {
tracing::info!(?signal, "received termination signal");
Ok(())
}
Err(e) => Err(e.into()),
}
}
})
}

async fn run_impl(self, node_config: NodeConfig) -> Result<()> {
init_logger(&node_config.logger, self.logger_config)?;
set_abort_with_tracing();

let mempool = {
let global_config = GlobalConfig::from_file(self.global_config)
.wrap_err("failed to load global config")?;

let keys = NodeKeys::from_file(self.keys).wrap_err("failed to load node keys")?;

let public_ip = resolve_public_ip(node_config.public_ip).await?;
let socket_addr = SocketAddr::new(public_ip, node_config.port);

Mempool::new(socket_addr, keys, node_config, global_config).await?
};

let (engine, anchor_consumer) = mempool
.boot(self.mempool_start_round)
.await
.wrap_err("failed to init mempool")?;

tokio::spawn(anchor_consumer.drain());

tracing::info!("starting mempool");

engine.run().await;

Ok(())
}
}

struct Mempool {
keypair: Arc<ed25519::KeyPair>,

dht_client: DhtClient,
peer_resolver: PeerResolver,
overlay_service: OverlayService,
storage: Storage,
all_peers: Vec<PeerId>,
}

impl Mempool {
pub async fn new(
public_addr: SocketAddr,
keys: NodeKeys,
node_config: NodeConfig,
global_config: GlobalConfig,
) -> Result<Self> {
let local_addr = SocketAddr::from((node_config.local_ip, node_config.port));

let (dht_client, peer_resolver, overlay_service) =
tycho_consensus::test_utils::from_validator(
local_addr,
&keys.as_secret(),
Some(public_addr),
node_config.dht,
Some(node_config.peer_resolver),
Some(node_config.overlay),
node_config.network,
);

let keypair = Arc::new(ed25519::KeyPair::from(&keys.as_secret()));
let local_id: PeerId = keypair.public_key.into();

let all_peers = global_config
.bootstrap_peers
.iter()
.map(|info| info.id)
.collect::<Vec<_>>();

let mut bootstrap_peers = 0usize;
for peer in global_config.bootstrap_peers {
let is_new = dht_client.add_peer(Arc::new(peer))?;
bootstrap_peers += is_new as usize;
}

tracing::info!(
%local_id,
%local_addr,
%public_addr,
bootstrap_peers,
"initialized network"
);

// Setup storage
let storage = Storage::builder()
.with_config(node_config.storage)
.build()
.await
.wrap_err("failed to create storage")?;
tracing::info!(
root_dir = %storage.root().path().display(),
"initialized storage"
);

Ok(Self {
keypair,
dht_client,
peer_resolver,
overlay_service,
all_peers,
storage,
})
}

pub async fn boot(self, mempool_start_round: Option<u32>) -> Result<(Engine, AnchorConsumer)> {
let local_id = self.dht_client.network().peer_id();

let (committed_tx, committed_rx) = mpsc::unbounded_channel();
let mut anchor_consumer = AnchorConsumer::default();
anchor_consumer.add(*local_id, committed_rx);

let mut engine = Engine::new(
self.keypair.clone(),
self.dht_client.network(),
&self.peer_resolver,
&self.overlay_service,
self.storage.mempool_storage(),
committed_tx,
anchor_consumer.collator_round(),
InputBuffer::new_stub(
NonZeroUsize::new(100).unwrap(),
NonZeroUsize::new(5).unwrap(),
),
mempool_start_round,
);

engine.init_with_genesis(&self.all_peers);

tracing::info!("mempool engine initialized");

Ok((engine, anchor_consumer))
}
}
26 changes: 26 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ mod tools {

mod node;
mod util;
#[cfg(feature = "debug")]
mod debug_cmd {
pub mod mempool;
}

#[cfg(feature = "jemalloc")]
#[global_allocator]
Expand Down Expand Up @@ -58,13 +62,18 @@ enum Cmd {
Node(NodeCmd),
#[clap(subcommand)]
Tool(ToolCmd),
#[cfg(feature = "debug")]
#[clap(subcommand)]
Debug(DebugCmd),
}

impl Cmd {
fn run(self) -> Result<()> {
match self {
Cmd::Node(cmd) => cmd.run(),
Cmd::Tool(cmd) => cmd.run(),
#[cfg(feature = "debug")]
Cmd::Debug(cmd) => cmd.run(),
}
}
}
Expand Down Expand Up @@ -111,6 +120,23 @@ impl ToolCmd {
}
}

/// A collection of debug modes
#[derive(Subcommand)]
#[cfg(feature = "debug")]
#[allow(clippy::enum_variant_names)]
enum DebugCmd {
Mempool(debug_cmd::mempool::CmdRun),
}

#[cfg(feature = "debug")]
impl DebugCmd {
fn run(self) -> Result<()> {
match self {
Self::Mempool(cmd) => cmd.run(),
}
}
}

fn version_string() -> &'static str {
static STRING: OnceLock<String> = OnceLock::new();
STRING.get_or_init(|| {
Expand Down
22 changes: 2 additions & 20 deletions cli/src/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use tycho_core::overlay_client::PublicOverlayClientConfig;
use tycho_network::{DhtConfig, NetworkConfig, OverlayConfig, PeerResolverConfig};
use tycho_rpc::RpcConfig;
use tycho_storage::StorageConfig;
use tycho_util::cli::LoggerConfig;
use tycho_util::cli::config::ThreadPoolConfig;
use tycho_util::cli::logger::LoggerConfig;

#[derive(Debug, Deserialize)]
pub struct NodeKeys {
Expand Down Expand Up @@ -142,25 +143,6 @@ impl Default for MetricsConfig {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ThreadPoolConfig {
pub rayon_threads: usize,
pub tokio_workers: usize,
}

impl Default for ThreadPoolConfig {
fn default() -> Self {
let total_threads = std::thread::available_parallelism()
.expect("failed to get total threads")
.get();
Self {
rayon_threads: total_threads,
tokio_workers: total_threads,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct MemoryProfilingConfig {
pub profiling_dir: PathBuf,
Expand Down
3 changes: 2 additions & 1 deletion cli/src/node/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use everscale_types::models::BlockId;
use serde::Serialize;
use tycho_control::ControlClient;
use tycho_core::block_strider::ManualGcTrigger;
use tycho_util::cli::signal;
use tycho_util::futures::JoinTask;

use crate::util::{print_json, signal};
use crate::util::print_json;

#[derive(Subcommand)]
pub enum CmdControl {
Expand Down
Loading

0 comments on commit d168ba6

Please sign in to comment.