diff --git a/Cargo.lock b/Cargo.lock index 4726b8e57..16f5c0b9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3417,6 +3417,7 @@ version = "0.0.1" dependencies = [ "anyhow", "async-trait", + "everscale-crypto", "everscale-types", "futures-util", "serde", @@ -3425,6 +3426,7 @@ dependencies = [ "tokio", "tracing", "tycho-core", + "tycho-network", "tycho-storage", "tycho-util", ] @@ -3599,6 +3601,7 @@ version = "0.0.1" dependencies = [ "ahash", "anyhow", + "base64", "bytes", "castaway", "dashmap", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 70d41d799..bb9e32e72 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -52,7 +52,7 @@ tracing-subscriber = { workspace = true } # local deps tycho-block-util = { workspace = true } tycho-collator = { workspace = true } -tycho-control = { workspace = true } +tycho-control = { workspace = true, features = ["full"] } tycho-core = { workspace = true } tycho-network = { workspace = true } tycho-rpc = { workspace = true } diff --git a/cli/src/node/control.rs b/cli/src/node/control.rs index f4460d3c1..fa3d77933 100644 --- a/cli/src/node/control.rs +++ b/cli/src/node/control.rs @@ -7,7 +7,6 @@ use clap::{Parser, Subcommand}; use everscale_types::models::BlockId; use serde::Serialize; use tycho_control::ControlClient; -use tycho_core::block_strider::ManualGcTrigger; use tycho_util::cli::signal; use tycho_util::futures::JoinTask; @@ -16,6 +15,7 @@ use crate::util::print_json; #[derive(Subcommand)] pub enum CmdControl { Ping(CmdPing), + GetInfo(CmdGetInfo), FindArchive(CmdFindArchive), ListArchives(CmdListArchives), DumpArchive(CmdDumpArchive), @@ -34,6 +34,7 @@ impl CmdControl { pub fn run(self) -> Result<()> { match self { Self::Ping(cmd) => cmd.run(), + Self::GetInfo(cmd) => cmd.run(), Self::FindArchive(cmd) => cmd.run(), Self::ListArchives(cmd) => cmd.run(), Self::DumpArchive(cmd) => cmd.run(), @@ -68,6 +69,28 @@ impl CmdPing { } } +/// Get a brief node info. +#[derive(Parser)] +pub struct CmdGetInfo { + /// Unix socket path to connect to. + #[clap(short, long)] + sock: Option, +} + +impl CmdGetInfo { + pub fn run(self) -> Result<()> { + control_rt(self.sock, |client| async move { + let info = client.get_node_info().await?; + print_json(serde_json::json!({ + "public_addr": info.public_addr, + "local_addr": info.local_addr.to_string(), + "adnl_id": info.adnl_id, + "validator_public_key": info.public_addr, + })) + }) + } +} + /// Trigger a garbage collection of archives. #[derive(Parser)] pub struct CmdGcArchives { @@ -527,11 +550,11 @@ struct TriggerBy { pub distance: Option, } -impl From for ManualGcTrigger { +impl From for tycho_control::proto::TriggerGcRequest { fn from(value: TriggerBy) -> Self { match (value.seqno, value.distance) { - (Some(seqno), None) => ManualGcTrigger::Exact(seqno), - (None, Some(distance)) => ManualGcTrigger::Distance(distance), + (Some(seqno), None) => tycho_control::proto::TriggerGcRequest::Exact(seqno), + (None, Some(distance)) => tycho_control::proto::TriggerGcRequest::Distance(distance), _ => unreachable!(), } } diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs index 837eaadc8..851c1a97c 100644 --- a/cli/src/node/mod.rs +++ b/cli/src/node/mod.rs @@ -249,6 +249,7 @@ pub struct Node { zerostate: ZerostateId, + network: Network, dht_client: DhtClient, peer_resolver: PeerResolver, overlay_service: OverlayService, @@ -367,7 +368,7 @@ impl Node { let blockchain_rpc_client = BlockchainRpcClient::builder() .with_public_overlay_client(PublicOverlayClient::new( - network, + network.clone(), public_overlay, node_config.public_overlay_client, )) @@ -384,6 +385,7 @@ impl Node { Ok(Self { keypair, + network, zerostate, dht_client, peer_resolver, @@ -561,8 +563,10 @@ impl Node { let _control_state = if let Some(config) = &self.control_config { let server = { let mut builder = ControlServer::builder() + .with_network(&self.network) .with_gc_subscriber(gc_subscriber.clone()) - .with_storage(self.storage.clone()); + .with_storage(self.storage.clone()) + .with_validator_keypair(self.keypair.clone()); #[cfg(feature = "jemalloc")] if let Some(profiler) = JemallocMemoryProfiler::connect() { diff --git a/control/Cargo.toml b/control/Cargo.toml index 125e7906c..e3a2e1113 100644 --- a/control/Cargo.toml +++ b/control/Cargo.toml @@ -8,19 +8,28 @@ rust-version.workspace = true repository.workspace = true license.workspace = true +# TODO: Make all dependencies optional to allow using just a plain proto + [dependencies] # crates.io deps anyhow = { workspace = true } async-trait = { workspace = true } +everscale-crypto = { workspace = true } everscale-types = { workspace = true } futures-util = { workspace = true } serde = { workspace = true } tarpc = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["sync", "rt"] } tracing = { workspace = true } # local deps -tycho-core = { workspace = true } -tycho-storage = { workspace = true } +tycho-core = { workspace = true, optional = true } +tycho-network = { workspace = true, optional = true } +tycho-storage = { workspace = true, optional = true } tycho-util = { workspace = true } + +[features] +full = ["client", "server"] +client = [] +server = ["dep:tycho-core", "dep:tycho-network", "dep:tycho-storage"] diff --git a/control/src/client.rs b/control/src/client.rs index 7e7eee82f..386f3abe4 100644 --- a/control/src/client.rs +++ b/control/src/client.rs @@ -7,7 +7,6 @@ use tarpc::tokio_serde::formats::Bincode; use tarpc::{client, context}; use tokio::sync::mpsc; use tracing::Instrument; -use tycho_core::block_strider::ManualGcTrigger; use tycho_util::compression::ZstdDecompressStream; use tycho_util::futures::JoinTask; @@ -33,23 +32,30 @@ impl ControlClient { self.inner.ping(current_context()).await.map_err(Into::into) } - pub async fn trigger_archives_gc(&self, trigger: ManualGcTrigger) -> ClientResult<()> { + pub async fn get_node_info(&self) -> ClientResult { self.inner - .trigger_archives_gc(current_context(), trigger) + .get_node_info(current_context()) .await .map_err(Into::into) } - pub async fn trigger_blocks_gc(&self, trigger: ManualGcTrigger) -> ClientResult<()> { + pub async fn trigger_archives_gc(&self, req: TriggerGcRequest) -> ClientResult<()> { self.inner - .trigger_blocks_gc(current_context(), trigger) + .trigger_archives_gc(current_context(), req) .await .map_err(Into::into) } - pub async fn trigger_states_gc(&self, trigger: ManualGcTrigger) -> ClientResult<()> { + pub async fn trigger_blocks_gc(&self, req: TriggerGcRequest) -> ClientResult<()> { self.inner - .trigger_states_gc(current_context(), trigger) + .trigger_blocks_gc(current_context(), req) + .await + .map_err(Into::into) + } + + pub async fn trigger_states_gc(&self, req: TriggerGcRequest) -> ClientResult<()> { + self.inner + .trigger_states_gc(current_context(), req) .await .map_err(Into::into) } diff --git a/control/src/error.rs b/control/src/error.rs index d5522f82c..4af45fb1f 100644 --- a/control/src/error.rs +++ b/control/src/error.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +#[cfg(feature = "client")] #[derive(Debug, thiserror::Error)] pub enum ClientError { #[error("client failed: {0}")] @@ -10,6 +11,7 @@ pub enum ClientError { Internal(#[from] ServerError), } +#[cfg(feature = "client")] pub type ClientResult = std::result::Result; #[derive(Debug, thiserror::Error, Serialize, Deserialize)] diff --git a/control/src/lib.rs b/control/src/lib.rs index 1951762c1..73840cf8b 100644 --- a/control/src/lib.rs +++ b/control/src/lib.rs @@ -1,12 +1,22 @@ +#[cfg(feature = "client")] pub use self::client::ControlClient; -pub use self::error::{ClientError, ServerResult}; +#[cfg(feature = "client")] +pub use self::error::{ClientError, ClientResult}; +pub use self::error::{ServerError, ServerResult}; +#[cfg(feature = "server")] pub use self::profiler::{MemoryProfiler, StubMemoryProfiler}; +#[cfg(feature = "server")] pub use self::server::{ControlEndpoint, ControlServer, ControlServerBuilder, ControlServerConfig}; -mod client; mod error; +#[cfg(feature = "server")] mod profiler; pub mod proto; + +#[cfg(feature = "client")] +mod client; + +#[cfg(feature = "server")] mod server; // TODO: Change the path to a more general setup. diff --git a/control/src/proto.rs b/control/src/proto.rs index c9e6de385..77cc71c3d 100644 --- a/control/src/proto.rs +++ b/control/src/proto.rs @@ -1,8 +1,10 @@ +use std::net::SocketAddr; use std::num::{NonZeroU32, NonZeroU64}; use everscale_types::models::{BlockId, BlockIdShort}; +use everscale_types::prelude::*; use serde::{Deserialize, Serialize}; -use tycho_core::block_strider::ManualGcTrigger; +use tycho_util::serde_helpers; use crate::error::ServerResult; @@ -11,14 +13,17 @@ pub trait ControlServer { /// Ping a node. Returns node timestamp in milliseconds. async fn ping() -> u64; + /// Returns a node info. + async fn get_node_info() -> NodeInfoResponse; + /// Trigger manual GC for archives. - async fn trigger_archives_gc(trigger: ManualGcTrigger); + async fn trigger_archives_gc(req: TriggerGcRequest); /// Trigger manual GC for blocks. - async fn trigger_blocks_gc(trigger: ManualGcTrigger); + async fn trigger_blocks_gc(req: TriggerGcRequest); /// Trigger manual GC for states. - async fn trigger_states_gc(trigger: ManualGcTrigger); + async fn trigger_states_gc(req: TriggerGcRequest); /// Sets memory profiler state. Returns whether the state was changed. async fn set_memory_profiler_enabled(enabled: bool) -> bool; @@ -46,6 +51,27 @@ pub trait ControlServer { /// Returns list of all block ids. async fn get_block_ids(req: BlockListRequest) -> ServerResult; + + /// Signs an elections payload. + async fn sign_elections_payload( + req: ElectionsPayloadRequest, + ) -> ServerResult; +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeInfoResponse { + // TODO: Somehow expose tycho_network::Address? + pub public_addr: String, + pub local_addr: SocketAddr, + pub adnl_id: HashBytes, + pub validator_public_key: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "ty", content = "seqno")] +pub enum TriggerGcRequest { + Exact(u32), + Distance(u32), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -99,3 +125,21 @@ pub struct BlockListResponse { pub blocks: Vec, pub continuation: Option, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct ElectionsPayloadRequest { + pub election_id: u32, + pub address: HashBytes, + pub max_factor: u32, + pub public_key: HashBytes, + pub adnl_addr: HashBytes, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ElectionsPayloadResponse { + // TODO: Add `serde(with = "base64")` + pub data: Vec, + pub public_key: HashBytes, + #[serde(with = "serde_helpers::signature")] + pub signature: Box<[u8; 64]>, +} diff --git a/control/src/server.rs b/control/src/server.rs index 98670402f..5ff755f05 100644 --- a/control/src/server.rs +++ b/control/src/server.rs @@ -2,14 +2,17 @@ use std::num::NonZeroU64; use std::path::PathBuf; use std::sync::Arc; +use everscale_crypto::ed25519; +use everscale_types::cell::HashBytes; use futures_util::future::BoxFuture; use futures_util::{FutureExt, StreamExt}; use serde::{Deserialize, Serialize}; use tarpc::server::Channel; use tycho_core::block_strider::{GcSubscriber, ManualGcTrigger}; +use tycho_network::Network; use tycho_storage::{ArchiveId, Storage}; -use crate::error::ServerResult; +use crate::error::{ServerError, ServerResult}; use crate::profiler::{MemoryProfiler, StubMemoryProfiler}; use crate::proto::{self, ArchiveInfo, ControlServer as _}; @@ -28,6 +31,11 @@ pub struct ControlServerConfig { /// /// Default: `true` pub overwrite_socket: bool, + + /// Maximum number of parallel connections. + /// + /// Default: `100` + pub max_connections: usize, } impl Default for ControlServerConfig { @@ -35,6 +43,7 @@ impl Default for ControlServerConfig { Self { socket_path: crate::DEFAULT_SOCKET_PATH.into(), overwrite_socket: true, + max_connections: 100, } } } @@ -71,8 +80,8 @@ impl ControlEndpoint { futures_util::future::ready(()) }) }) - // Max 1 channel. - .buffer_unordered(1) + // Max N channels. + .buffer_unordered(config.max_connections) .for_each(|_| async {}) .boxed(); @@ -90,60 +99,86 @@ impl Drop for ControlEndpoint { } } -pub struct ControlServerBuilder { +pub struct ControlServerBuilder { mandatory_fields: MandatoryFields, memory_profiler: Option>, + validator_keypair: Option>, } impl ControlServerBuilder { pub fn build(self) -> ControlServer { - let (storage, gc_subscriber) = self.mandatory_fields; + let (network, storage, gc_subscriber) = self.mandatory_fields; let memory_profiler = self .memory_profiler .unwrap_or_else(|| Arc::new(StubMemoryProfiler)); + let info = proto::NodeInfoResponse { + public_addr: network.remote_addr().to_string(), + local_addr: network.local_addr(), + adnl_id: HashBytes(network.peer_id().to_bytes()), + validator_public_key: self + .validator_keypair + .as_ref() + .map(|k| HashBytes(k.public_key.to_bytes())), + }; + ControlServer { inner: Arc::new(Inner { + info, gc_subscriber, storage, memory_profiler, + validator_keypair: self.validator_keypair, }), } } } -impl ControlServerBuilder<((), T2)> { - pub fn with_storage(self, storage: Storage) -> ControlServerBuilder<(Storage, T2)> { - let (_, t2) = self.mandatory_fields; +impl ControlServerBuilder<((), T2, T3)> { + pub fn with_network(self, network: &Network) -> ControlServerBuilder<(Network, T2, T3)> { + let (_, t2, t3) = self.mandatory_fields; + ControlServerBuilder { + mandatory_fields: (network.clone(), t2, t3), + memory_profiler: self.memory_profiler, + validator_keypair: self.validator_keypair, + } + } +} + +impl ControlServerBuilder<(T1, (), T3)> { + pub fn with_storage(self, storage: Storage) -> ControlServerBuilder<(T1, Storage, T3)> { + let (t1, _, t3) = self.mandatory_fields; ControlServerBuilder { - mandatory_fields: (storage, t2), + mandatory_fields: (t1, storage, t3), memory_profiler: self.memory_profiler, + validator_keypair: self.validator_keypair, } } } -impl ControlServerBuilder<(T1, ())> { +impl ControlServerBuilder<(T1, T2, ())> { pub fn with_gc_subscriber( self, gc_subscriber: GcSubscriber, - ) -> ControlServerBuilder<(T1, GcSubscriber)> { - let (t1, _) = self.mandatory_fields; + ) -> ControlServerBuilder<(T1, T2, GcSubscriber)> { + let (t1, t2, _) = self.mandatory_fields; ControlServerBuilder { - mandatory_fields: (t1, gc_subscriber), + mandatory_fields: (t1, t2, gc_subscriber), memory_profiler: self.memory_profiler, + validator_keypair: self.validator_keypair, } } } impl ControlServerBuilder { - pub fn with_memory_profiler( - self, - memory_profiler: Arc, - ) -> ControlServerBuilder { - ControlServerBuilder { - mandatory_fields: self.mandatory_fields, - memory_profiler: Some(memory_profiler), - } + pub fn with_memory_profiler(mut self, memory_profiler: Arc) -> Self { + self.memory_profiler = Some(memory_profiler); + self + } + + pub fn with_validator_keypair(mut self, keypair: Arc) -> Self { + self.validator_keypair = Some(keypair); + self } } @@ -154,10 +189,11 @@ pub struct ControlServer { } impl ControlServer { - pub fn builder() -> ControlServerBuilder<((), ())> { + pub fn builder() -> ControlServerBuilder<((), (), ())> { ControlServerBuilder { - mandatory_fields: ((), ()), + mandatory_fields: ((), (), ()), memory_profiler: None, + validator_keypair: None, } } } @@ -167,16 +203,20 @@ impl proto::ControlServer for ControlServer { tycho_util::time::now_millis() } - async fn trigger_archives_gc(self, _: Context, trigger: ManualGcTrigger) { - self.inner.gc_subscriber.trigger_archives_gc(trigger); + async fn get_node_info(self, _: tarpc::context::Context) -> proto::NodeInfoResponse { + self.inner.info.clone() } - async fn trigger_blocks_gc(self, _: Context, trigger: ManualGcTrigger) { - self.inner.gc_subscriber.trigger_blocks_gc(trigger); + async fn trigger_archives_gc(self, _: Context, req: proto::TriggerGcRequest) { + self.inner.gc_subscriber.trigger_archives_gc(req.into()); } - async fn trigger_states_gc(self, _: Context, trigger: ManualGcTrigger) { - self.inner.gc_subscriber.trigger_states_gc(trigger); + async fn trigger_blocks_gc(self, _: Context, req: proto::TriggerGcRequest) { + self.inner.gc_subscriber.trigger_blocks_gc(req.into()); + } + + async fn trigger_states_gc(self, _: Context, req: proto::TriggerGcRequest) { + self.inner.gc_subscriber.trigger_states_gc(req.into()); } async fn set_memory_profiler_enabled(self, _: Context, enabled: bool) -> bool { @@ -302,12 +342,71 @@ impl proto::ControlServer for ControlServer { continuation, }) } + + async fn sign_elections_payload( + self, + _: tarpc::context::Context, + req: proto::ElectionsPayloadRequest, + ) -> ServerResult { + let Some(keypair) = self.inner.validator_keypair.as_ref() else { + return Err(ServerError::new( + "control server was created without a keystore", + )); + }; + + if keypair.public_key.as_bytes() != req.public_key.as_array() { + return Err(ServerError::new( + "no validator key found for the specified public key", + )); + } + + let data = build_elections_data_to_sign(&req); + let signature = keypair.sign_raw(&data); + + Ok(proto::ElectionsPayloadResponse { + data, + public_key: HashBytes(keypair.public_key.to_bytes()), + signature: Box::new(signature), + }) + } } struct Inner { + info: proto::NodeInfoResponse, gc_subscriber: GcSubscriber, storage: Storage, memory_profiler: Arc, + validator_keypair: Option>, } type Context = tarpc::context::Context; + +impl From for proto::TriggerGcRequest { + fn from(value: ManualGcTrigger) -> Self { + match value { + ManualGcTrigger::Exact(mc_seqno) => Self::Exact(mc_seqno), + ManualGcTrigger::Distance(distance) => Self::Distance(distance), + } + } +} + +impl From for ManualGcTrigger { + fn from(value: proto::TriggerGcRequest) -> Self { + match value { + proto::TriggerGcRequest::Exact(mc_seqno) => Self::Exact(mc_seqno), + proto::TriggerGcRequest::Distance(distance) => Self::Distance(distance), + } + } +} + +fn build_elections_data_to_sign(req: &proto::ElectionsPayloadRequest) -> Vec { + const TL_ID: u32 = 0x654C5074; + + let mut data = Vec::with_capacity(4 + 4 + 4 + 32 + 32); + data.extend_from_slice(&TL_ID.to_be_bytes()); + data.extend_from_slice(&req.election_id.to_be_bytes()); + data.extend_from_slice(&req.max_factor.to_be_bytes()); + data.extend_from_slice(req.address.as_slice()); + data.extend_from_slice(req.adnl_addr.as_array()); + data +} diff --git a/network/src/types/peer_info.rs b/network/src/types/peer_info.rs index e9117842e..43fe66a22 100644 --- a/network/src/types/peer_info.rs +++ b/network/src/types/peer_info.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use tl_proto::{TlRead, TlWrite}; -use tycho_util::tl; +use tycho_util::{serde_helpers, tl}; use crate::types::{Address, PeerId}; use crate::util::check_peer_signature; @@ -25,7 +25,7 @@ pub struct PeerInfo { /// Unix timestamp up to which the info is valid. pub expires_at: u32, /// A `ed25519` signature of the info. - #[serde(with = "serde_signature")] + #[serde(with = "serde_helpers::signature")] #[tl(signature, with = "tl::signature_owned")] pub signature: Box<[u8; 64]>, } @@ -98,51 +98,6 @@ mod tl_address_list { } } -mod serde_signature { - use base64::engine::Engine as _; - use base64::prelude::BASE64_STANDARD; - use tycho_util::serde_helpers::{BorrowedStr, BytesVisitor}; - - use super::*; - - pub fn serialize(data: &[u8; 64], serializer: S) -> Result - where - S: serde::Serializer, - { - if serializer.is_human_readable() { - serializer.serialize_str(&BASE64_STANDARD.encode(data)) - } else { - data.serialize(serializer) - } - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - use serde::de::Error; - - if deserializer.is_human_readable() { - as Deserialize>::deserialize(deserializer).and_then( - |BorrowedStr(s)| { - let mut buffer = [0u8; 66]; - match BASE64_STANDARD.decode_slice(s.as_ref(), &mut buffer) { - Ok(64) => { - let [data @ .., _, _] = buffer; - Ok(Box::new(data)) - } - _ => Err(Error::custom("Invalid signature")), - } - }, - ) - } else { - deserializer - .deserialize_bytes(BytesVisitor::<64>) - .map(Box::new) - } - } -} - #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/util/Cargo.toml b/util/Cargo.toml index 254209182..cb53eb105 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true # crates.io deps ahash = { workspace = true } anyhow = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } castaway = { workspace = true } dashmap = { workspace = true } diff --git a/util/src/serde_helpers.rs b/util/src/serde_helpers.rs index afa5772e7..0c1e74cdd 100644 --- a/util/src/serde_helpers.rs +++ b/util/src/serde_helpers.rs @@ -245,6 +245,50 @@ pub mod option_string { } } +pub mod signature { + use base64::engine::Engine as _; + use base64::prelude::BASE64_STANDARD; + + use super::*; + + pub fn serialize(data: &[u8; 64], serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.serialize_str(&BASE64_STANDARD.encode(data)) + } else { + data.serialize(serializer) + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + use serde::de::Error; + + if deserializer.is_human_readable() { + as Deserialize>::deserialize(deserializer).and_then( + |BorrowedStr(s)| { + let mut buffer = [0u8; 66]; + match BASE64_STANDARD.decode_slice(s.as_ref(), &mut buffer) { + Ok(64) => { + let [data @ .., _, _] = buffer; + Ok(Box::new(data)) + } + _ => Err(Error::custom("Invalid signature")), + } + }, + ) + } else { + deserializer + .deserialize_bytes(BytesVisitor::<64>) + .map(Box::new) + } + } +} + #[derive(Deserialize)] #[repr(transparent)] pub struct BorrowedStr<'a>(#[serde(borrow)] pub Cow<'a, str>);