From e4f5d65d471954b9cc1148ed067e9bb3f598bb7a Mon Sep 17 00:00:00 2001 From: Elvis Date: Fri, 9 Jun 2023 16:30:39 +0200 Subject: [PATCH] sync with peers before producing blocks (#1169) Closes https://github.com/FuelLabs/fuel-core/issues/1120 After refactoring and enabling `TaskParams` for `Services` in order for them to be turned into `Task` with a `TaskParam`, I created SyncPort, and added it as a `TaskParam` for `poa`'s Service. The current implementation is that the `SyncAdapter` only checks for 60 seconds if a new block was imported, if not, it considers the PoA node to be synced. Improvement on top of this would be - waiting first to connect to N amount of Reserved Peers. --------- Co-authored-by: Green Baneling --- .../tests/integration_tests.rs | 1 + crates/chain-config/src/config/chain.rs | 6 +- crates/chain-config/src/config/consensus.rs | 15 +- crates/fuel-core/src/service.rs | 5 +- .../service/adapters/consensus_module/poa.rs | 46 ++- crates/fuel-core/src/service/config.rs | 1 + crates/fuel-core/src/service/genesis.rs | 5 +- crates/fuel-core/src/service/sub_services.rs | 24 +- .../consensus_module/poa/src/config.rs | 18 +- .../services/consensus_module/poa/src/lib.rs | 1 + .../consensus_module/poa/src/ports.rs | 20 +- .../consensus_module/poa/src/service.rs | 91 ++++- .../consensus_module/poa/src/service_test.rs | 54 ++- .../service_test/manually_produce_tests.rs | 5 + .../poa/src/service_test/trigger_tests.rs | 25 +- .../services/consensus_module/poa/src/sync.rs | 377 ++++++++++++++++++ crates/services/importer/src/importer.rs | 12 +- crates/services/importer/src/importer/test.rs | 10 +- crates/services/p2p/src/p2p_service.rs | 4 +- crates/services/p2p/src/peer_manager.rs | 22 +- crates/services/p2p/src/service.rs | 11 + .../txpool/src/service/test_helpers.rs | 8 +- crates/types/src/services/block_importer.rs | 83 ++++ 23 files changed, 750 insertions(+), 94 deletions(-) create mode 100644 crates/services/consensus_module/poa/src/sync.rs diff --git a/bin/e2e-test-client/tests/integration_tests.rs b/bin/e2e-test-client/tests/integration_tests.rs index d03d48e6d2e..da0849e88ce 100644 --- a/bin/e2e-test-client/tests/integration_tests.rs +++ b/bin/e2e-test-client/tests/integration_tests.rs @@ -2,6 +2,7 @@ use fuel_core::service::{ Config, FuelService, }; + // Add methods on commands use fuel_core_e2e_client::config::SuiteConfig; use std::fs; diff --git a/crates/chain-config/src/config/chain.rs b/crates/chain-config/src/config/chain.rs index 7519570b361..afba2549e6a 100644 --- a/crates/chain-config/src/config/chain.rs +++ b/crates/chain-config/src/config/chain.rs @@ -7,7 +7,6 @@ use fuel_core_types::{ fuel_crypto::Hasher, fuel_tx::{ ConsensusParameters, - Input, UtxoId, }, fuel_types::{ @@ -46,7 +45,6 @@ use crate::{ coin::CoinConfig, state::StateConfig, }, - default_consensus_dev_key, genesis::GenesisCommitment, ConsensusConfig, }; @@ -81,9 +79,7 @@ impl Default for ChainConfig { transaction_parameters: ConsensusParameters::DEFAULT, initial_state: None, gas_costs: GasCosts::default(), - consensus: ConsensusConfig::PoA { - signing_key: Input::owner(&default_consensus_dev_key().public_key()), - }, + consensus: ConsensusConfig::default_poa(), } } } diff --git a/crates/chain-config/src/config/consensus.rs b/crates/chain-config/src/config/consensus.rs index c51cdd67891..c5c14ec0172 100644 --- a/crates/chain-config/src/config/consensus.rs +++ b/crates/chain-config/src/config/consensus.rs @@ -1,10 +1,23 @@ -use fuel_core_types::fuel_types::Address; +use fuel_core_types::{ + fuel_tx::Input, + fuel_types::Address, +}; use serde::{ Deserialize, Serialize, }; +use crate::default_consensus_dev_key; + #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] pub enum ConsensusConfig { PoA { signing_key: Address }, } + +impl ConsensusConfig { + pub fn default_poa() -> Self { + ConsensusConfig::PoA { + signing_key: Input::owner(&default_consensus_dev_key().public_key()), + } + } +} diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 747c56f4c6f..8282885d720 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -310,6 +310,7 @@ mod tests { i += 1; } + // current services: graphql, txpool, PoA #[allow(unused_mut)] let mut expected_services = 3; @@ -320,8 +321,8 @@ mod tests { // } #[cfg(feature = "p2p")] { - // p2p - expected_services += 1; + // p2p & sync + expected_services += 2; } // # Dev-note: Update the `expected_services` when we add/remove a new/old service. diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 33126d0e6d2..46ed86fdc1b 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -1,9 +1,12 @@ +use std::ops::Deref; + use crate::{ database::Database, fuel_core_graphql_api::ports::ConsensusModulePort, service::adapters::{ BlockImporterAdapter, BlockProducerAdapter, + P2PAdapter, PoAAdapter, TxPoolAdapter, }, @@ -12,6 +15,7 @@ use anyhow::anyhow; use fuel_core_poa::{ ports::{ BlockImporter, + P2pPort, TransactionPool, }, service::SharedState, @@ -23,12 +27,19 @@ use fuel_core_types::{ fuel_tx::TxId, fuel_types::BlockHeight, services::{ - block_importer::UncommittedResult as UncommittedImporterResult, + block_importer::{ + BlockImportInfo, + UncommittedResult as UncommittedImporterResult, + }, executor::UncommittedResult, txpool::ArcPoolTx, }, tai64::Tai64, }; +use tokio_stream::{ + wrappers::BroadcastStream, + StreamExt, +}; impl PoAAdapter { pub fn new(shared_state: Option) -> Self { @@ -65,10 +76,6 @@ impl TransactionPool for TxPoolAdapter { } fn transaction_status_events(&self) -> BoxStream { - use tokio_stream::{ - wrappers::BroadcastStream, - StreamExt, - }; Box::pin( BroadcastStream::new(self.service.new_tx_notification_subscribe()) .filter_map(|result| result.ok()), @@ -103,4 +110,33 @@ impl BlockImporter for BlockImporterAdapter { .commit_result(result) .map_err(Into::into) } + + fn block_stream(&self) -> BoxStream { + Box::pin( + BroadcastStream::new(self.block_importer.subscribe()) + .filter_map(|result| result.ok()) + .map(|r| r.deref().into()), + ) + } +} + +#[cfg(feature = "p2p")] +impl P2pPort for P2PAdapter { + fn reserved_peers_count(&self) -> BoxStream { + if let Some(service) = &self.service { + Box::pin( + BroadcastStream::new(service.subscribe_reserved_peers_count()) + .filter_map(|result| result.ok()), + ) + } else { + Box::pin(tokio_stream::pending()) + } + } +} + +#[cfg(not(feature = "p2p"))] +impl P2pPort for P2PAdapter { + fn reserved_peers_count(&self) -> BoxStream { + Box::pin(tokio_stream::pending()) + } } diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index dfec5dc28cd..c554bdef495 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -118,6 +118,7 @@ impl TryFrom<&Config> for fuel_core_poa::Config { signing_key: config.consensus_key.clone(), metrics: false, consensus_params: config.chain_conf.transaction_parameters, + ..Default::default() }) } } diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index c229b649407..a648f5c5557 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -145,10 +145,7 @@ fn import_genesis_block( (), ); importer.commit_result(UncommittedImportResult::new( - ImportResult { - sealed_block: block, - tx_status: vec![], - }, + ImportResult::new_from_local(block, vec![]), database_transaction, ))?; Ok(()) diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 584799562c0..cca69abdc7b 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -125,6 +125,7 @@ pub fn init_sub_services( let poa_config: fuel_core_poa::Config = config.try_into()?; let production_enabled = !matches!(poa_config.trigger, Trigger::Never) || config.manual_blocks_enabled; + let poa = (production_enabled).then(|| { fuel_core_poa::new_service( last_block.header(), @@ -132,22 +133,19 @@ pub fn init_sub_services( tx_pool_adapter.clone(), producer_adapter.clone(), importer_adapter.clone(), + p2p_adapter.clone(), ) }); let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone())); #[cfg(feature = "p2p")] - let sync = (!production_enabled) - .then(|| { - fuel_core_sync::service::new_service( - *last_block.header().height(), - p2p_adapter, - importer_adapter.clone(), - verifier, - config.sync, - ) - }) - .transpose()?; + let sync = fuel_core_sync::service::new_service( + *last_block.header().height(), + p2p_adapter, + importer_adapter.clone(), + verifier, + config.sync, + )?; // TODO: Figure out on how to move it into `fuel-core-graphql-api`. let schema = { @@ -219,9 +217,7 @@ pub fn init_sub_services( { if let Some(network) = network.take() { services.push(Box::new(network)); - if let Some(sync) = sync { - services.push(Box::new(sync)); - } + services.push(Box::new(sync)); } } diff --git a/crates/services/consensus_module/poa/src/config.rs b/crates/services/consensus_module/poa/src/config.rs index a6e5dc1cc13..e766d9fd9c2 100644 --- a/crates/services/consensus_module/poa/src/config.rs +++ b/crates/services/consensus_module/poa/src/config.rs @@ -6,13 +6,29 @@ use fuel_core_types::{ }; use tokio::time::Duration; -#[derive(Default, Debug, Clone)] +#[derive(Debug, Clone)] pub struct Config { pub trigger: Trigger, pub block_gas_limit: Word, pub signing_key: Option>, pub metrics: bool, pub consensus_params: ConsensusParameters, + pub min_connected_reserved_peers: usize, + pub time_until_synced: Duration, +} + +impl Default for Config { + fn default() -> Self { + Config { + trigger: Trigger::default(), + block_gas_limit: 0, + signing_key: None, + metrics: false, + consensus_params: ConsensusParameters::default(), + min_connected_reserved_peers: 0, + time_until_synced: Duration::ZERO, + } + } } /// Block production trigger for PoA operation diff --git a/crates/services/consensus_module/poa/src/lib.rs b/crates/services/consensus_module/poa/src/lib.rs index af98e73bb45..5fd3256d594 100644 --- a/crates/services/consensus_module/poa/src/lib.rs +++ b/crates/services/consensus_module/poa/src/lib.rs @@ -2,6 +2,7 @@ #![deny(unused_must_use)] mod deadline_clock; +mod sync; #[cfg(test)] mod service_test; diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index cf88f71b262..967f64ef94c 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -15,7 +15,10 @@ use fuel_core_types::{ Bytes32, }, services::{ - block_importer::UncommittedResult as UncommittedImportResult, + block_importer::{ + BlockImportInfo, + UncommittedResult as UncommittedImportResult, + }, executor::UncommittedResult as UncommittedExecutionResult, txpool::ArcPoolTx, }, @@ -58,6 +61,8 @@ pub trait BlockImporter: Send + Sync { &self, result: UncommittedImportResult>, ) -> anyhow::Result<()>; + + fn block_stream(&self) -> BoxStream; } #[cfg_attr(test, mockall::automock)] @@ -83,3 +88,16 @@ pub trait RelayerPort { max_da_lag: &DaBlockHeight, ) -> anyhow::Result<()>; } + +#[cfg_attr(test, mockall::automock)] +pub trait P2pPort: Send + Sync + 'static { + /// Subscribe to reserved peers connection updates. + fn reserved_peers_count(&self) -> BoxStream; +} + +#[async_trait::async_trait] +#[cfg_attr(test, mockall::automock)] +pub trait SyncPort: Send + Sync { + /// await synchronization with the peers + async fn sync_with_peers(&mut self) -> anyhow::Result<()>; +} diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 88d9780902e..62148196376 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -6,8 +6,13 @@ use crate::{ ports::{ BlockImporter, BlockProducer, + P2pPort, TransactionPool, }, + sync::{ + SyncState, + SyncTask, + }, Config, Trigger, }; @@ -19,6 +24,7 @@ use fuel_core_services::{ stream::BoxStream, RunnableService, RunnableTask, + Service as _, ServiceRunner, StateWatcher, }; @@ -70,8 +76,7 @@ use tokio::{ use tokio_stream::StreamExt; use tracing::error; -pub type Service = ServiceRunner>; - +pub type Service = ServiceRunner>; #[derive(Clone)] pub struct SharedState { request_sender: mpsc::Sender, @@ -121,7 +126,7 @@ pub(crate) enum RequestType { Trigger, } -pub struct Task { +pub struct MainTask { block_gas_limit: Word, signing_key: Option>, block_producer: B, @@ -137,18 +142,21 @@ pub struct Task { /// Deadline clock, used by the triggers timer: DeadlineClock, consensus_params: ConsensusParameters, + sync_task_handle: ServiceRunner, } -impl Task +impl MainTask where T: TransactionPool, + I: BlockImporter, { - pub fn new( + pub fn new( last_block: &BlockHeader, config: Config, txpool: T, block_producer: B, block_importer: I, + p2p_port: P, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(100); @@ -156,21 +164,47 @@ where let duration = Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0)); let last_block_created = Instant::now() - duration; + + let block_stream = block_importer.block_stream(); + let peer_connections_stream = p2p_port.reserved_peers_count(); + let last_block_height = *last_block.height(); + + let Config { + block_gas_limit, + signing_key, + consensus_params, + min_connected_reserved_peers, + time_until_synced, + trigger, + .. + } = config; + + let sync_task = SyncTask::new( + peer_connections_stream, + min_connected_reserved_peers, + time_until_synced, + block_stream, + last_block_height, + ); + + let sync_task_handle = ServiceRunner::new(sync_task); + Self { - block_gas_limit: config.block_gas_limit, - signing_key: config.signing_key, + block_gas_limit, + signing_key, txpool, block_producer, block_importer, tx_status_update_stream, request_receiver, shared_state: SharedState { request_sender }, - last_height: *last_block.height(), + last_height: last_block_height, last_timestamp, last_block_created, - trigger: config.trigger, + trigger, timer: DeadlineClock::new(), - consensus_params: config.consensus_params, + consensus_params, + sync_task_handle, } } @@ -204,7 +238,7 @@ where } } -impl Task +impl MainTask where T: TransactionPool, B: BlockProducer, @@ -289,10 +323,7 @@ where }; // Import the sealed block self.block_importer.commit_result(Uncommitted::new( - ImportResult { - sealed_block: block, - tx_status, - }, + ImportResult::new_from_local(block, tx_status), db_transaction, ))?; @@ -410,14 +441,14 @@ where } #[async_trait::async_trait] -impl RunnableService for Task +impl RunnableService for MainTask where Self: RunnableTask, { const NAME: &'static str = "PoA"; type SharedData = SharedState; - type Task = Task; + type Task = MainTask; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -429,6 +460,8 @@ where _: &StateWatcher, _: Self::TaskParams, ) -> anyhow::Result { + self.sync_task_handle.start_and_await().await?; + match self.trigger { Trigger::Never | Trigger::Instant => {} Trigger::Interval { block_time } => { @@ -442,20 +475,31 @@ where .await; } }; + Ok(self) } } #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + // make sure we're synced first + if *self.sync_task_handle.shared.borrow() == SyncState::NotSynced { + tokio::select! { + biased; + _ = watcher.while_started() => {} + _ = self.sync_task_handle.shared.changed() => {} + } + } + let should_continue; tokio::select! { + biased; _ = watcher.while_started() => { should_continue = false; } @@ -495,30 +539,33 @@ where } async fn shutdown(self) -> anyhow::Result<()> { - // Nothing to shut down because we don't have any temporary state that should be dumped, - // and we don't spawn any sub-tasks that we need to finish or await. + tracing::info!("PoA MainTask shutting down"); + self.sync_task_handle.stop_and_await().await?; Ok(()) } } -pub fn new_service( +pub fn new_service( last_block: &BlockHeader, config: Config, txpool: T, block_producer: B, block_importer: I, + p2p_port: P, ) -> Service where T: TransactionPool + 'static, B: BlockProducer + 'static, I: BlockImporter + 'static, + P: P2pPort, { - Service::new(Task::new( + Service::new(MainTask::new( last_block, config, txpool, block_producer, block_importer, + p2p_port, )) } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index b2cb3fc8b52..c6d4b42826b 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -3,9 +3,10 @@ use crate::{ ports::{ MockBlockImporter, MockBlockProducer, + MockP2pPort, MockTransactionPool, }, - service::Task, + service::MainTask, Config, Service, Trigger, @@ -72,6 +73,16 @@ struct TestContextBuilder { producer: Option, } +fn generate_p2p_port() -> MockP2pPort { + let mut p2p_port = MockP2pPort::default(); + + p2p_port + .expect_reserved_peers_count() + .returning(move || Box::pin(tokio_stream::pending())); + + p2p_port +} + impl TestContextBuilder { fn new() -> Self { Self { @@ -125,18 +136,24 @@ impl TestContextBuilder { let mut importer = MockBlockImporter::default(); importer.expect_commit_result().returning(|_| Ok(())); importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); + importer }); let txpool = self .txpool .unwrap_or_else(MockTransactionPool::no_tx_updates); + let p2p_port = generate_p2p_port(); + let service = new_service( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, txpool, producer, importer, + p2p_port, ); service.start().unwrap(); TestContext { service } @@ -274,6 +291,10 @@ async fn remove_skipped_transactions() { .times(1) .returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); + let mut txpool = MockTransactionPool::no_tx_updates(); // Test created for only for this check. txpool.expect_remove_txs().returning(move |skipped_ids| { @@ -300,14 +321,18 @@ async fn remove_skipped_transactions() { block_gas_limit: 1000000, signing_key: Some(Secret::new(secret_key.into())), metrics: false, - consensus_params: Default::default(), + ..Default::default() }; - let mut task = Task::new( + + let p2p_port = generate_p2p_port(); + + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, txpool, block_producer, block_importer, + p2p_port, ); assert!(task.produce_next_block().await.is_ok()); @@ -331,6 +356,9 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { block_importer .expect_commit_result() .returning(|_| panic!("Block importer should not be called")); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); let mut txpool = MockTransactionPool::no_tx_updates(); txpool.expect_total_consumable_gas().returning(|| 0); @@ -341,14 +369,18 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { block_gas_limit: 1000000, signing_key: Some(Secret::new(secret_key.into())), metrics: false, - consensus_params: Default::default(), + ..Default::default() }; - let mut task = Task::new( + + let p2p_port = generate_p2p_port(); + + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, txpool, block_producer, block_importer, + p2p_port, ); // simulate some txpool event to see if any block production is erroneously triggered @@ -376,6 +408,10 @@ async fn hybrid_production_doesnt_produce_empty_blocks_when_txpool_is_empty() { .expect_commit_result() .returning(|_| panic!("Block importer should not be called")); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); + let mut txpool = MockTransactionPool::no_tx_updates(); txpool.expect_total_consumable_gas().returning(|| 0); txpool.expect_pending_number().returning(|| 0); @@ -389,14 +425,18 @@ async fn hybrid_production_doesnt_produce_empty_blocks_when_txpool_is_empty() { block_gas_limit: 1000000, signing_key: Some(Secret::new(secret_key.into())), metrics: false, - consensus_params: Default::default(), + ..Default::default() }; - let task = Task::new( + + let p2p_port = generate_p2p_port(); + + let task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, txpool, block_producer, block_importer, + p2p_port, ); let service = Service::new(task); diff --git a/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs b/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs index 783521a6032..19fb28a9bf4 100644 --- a/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs +++ b/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs @@ -47,6 +47,7 @@ async fn can_manually_produce_block( signing_key: Some(test_signing_key()), metrics: false, consensus_params, + ..Default::default() }); // initialize txpool with some txs @@ -65,6 +66,10 @@ async fn can_manually_produce_block( .unwrap(); Ok(()) }); + importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); + let mut producer = MockBlockProducer::default(); producer .expect_produce_and_execute_block() diff --git a/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs b/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs index 9a8de6ae7b6..0efdb2d9dd4 100644 --- a/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs +++ b/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs @@ -20,7 +20,7 @@ async fn clean_startup_shutdown_each_trigger() -> anyhow::Result<()> { block_gas_limit: 100_000, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); let ctx = ctx_builder.build(); @@ -42,6 +42,7 @@ async fn never_trigger_never_produces_blocks() { signing_key: Some(test_signing_key()), metrics: false, consensus_params, + ..Default::default() }); // initialize txpool with some txs @@ -57,6 +58,9 @@ async fn never_trigger_never_produces_blocks() { importer .expect_commit_result() .returning(|_| panic!("Should not commit result")); + importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); ctx_builder.with_importer(importer); let ctx = ctx_builder.build(); for tx in txs { @@ -100,6 +104,9 @@ impl DefaultContext { block_import_sender.send(sealed_block)?; Ok(()) }); + importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); ctx_builder.with_importer(importer); let test_ctx = ctx_builder.build(); @@ -121,7 +128,7 @@ async fn instant_trigger_produces_block_instantly() { block_gas_limit: 100_000, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); ctx.status_sender.send_replace(Some(TxId::zeroed())); @@ -141,7 +148,7 @@ async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { block_gas_limit: 100_000, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); ctx.status_sender.send_replace(Some(TxId::zeroed())); @@ -207,7 +214,7 @@ async fn interval_trigger_doesnt_react_to_full_txpool() -> anyhow::Result<()> { block_gas_limit: 100_000, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); // Brackets to release the lock. @@ -254,7 +261,7 @@ async fn hybrid_trigger_produces_blocks_correctly_max_block_time() -> anyhow::Re block_gas_limit: 100_000, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); // Make sure no blocks are produced yet @@ -301,7 +308,7 @@ async fn hybrid_trigger_produces_blocks_correctly_max_block_time_not_overrides_m block_gas_limit: Word::MAX, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); // Make sure no blocks are produced when txpool is empty and `MAX_BLOCK_TIME` is not exceeded @@ -344,7 +351,7 @@ async fn hybrid_trigger_produces_blocks_correctly_max_tx_idle_time() -> anyhow:: block_gas_limit: Word::MAX, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); assert!(matches!( @@ -396,7 +403,7 @@ async fn hybrid_trigger_produces_blocks_correctly_min_block_time_min_block_gas_l block_gas_limit: Word::MIN, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); // Emulate tx status update to trigger the execution. @@ -454,7 +461,7 @@ async fn hybrid_trigger_produces_blocks_correctly_min_block_time_max_block_gas_l block_gas_limit: Word::MAX, signing_key: Some(test_signing_key()), metrics: false, - consensus_params: Default::default(), + ..Default::default() }); // Emulate tx status update to trigger the execution. diff --git a/crates/services/consensus_module/poa/src/sync.rs b/crates/services/consensus_module/poa/src/sync.rs new file mode 100644 index 00000000000..74cee82c7fb --- /dev/null +++ b/crates/services/consensus_module/poa/src/sync.rs @@ -0,0 +1,377 @@ +use std::time::Duration; + +use fuel_core_services::{ + stream::BoxStream, + RunnableService, + RunnableTask, + StateWatcher, +}; +use fuel_core_types::{ + fuel_types::BlockHeight, + services::block_importer::BlockImportInfo, +}; + +use tokio::sync::watch; +use tokio_stream::StreamExt; + +use crate::deadline_clock::{ + DeadlineClock, + OnConflict, +}; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum SyncState { + NotSynced, + Synced, +} + +impl SyncState { + pub fn from_config( + min_connected_reserved_peers: usize, + time_until_synced: Duration, + ) -> SyncState { + if min_connected_reserved_peers == 0 && time_until_synced == Duration::ZERO { + SyncState::Synced + } else { + SyncState::NotSynced + } + } +} + +pub struct SyncTask { + min_connected_reserved_peers: usize, + time_until_synced: Duration, + peer_connections_stream: BoxStream, + block_stream: BoxStream, + state_sender: watch::Sender, + // shared with `MainTask` via SyncTask::SharedState + state_receiver: watch::Receiver, + inner_state: InnerSyncState, + timer: DeadlineClock, +} + +impl SyncTask { + pub fn new( + peer_connections_stream: BoxStream, + min_connected_reserved_peers: usize, + time_until_synced: Duration, + block_stream: BoxStream, + block_height: BlockHeight, + ) -> Self { + let inner_state = InnerSyncState::from_config( + min_connected_reserved_peers, + time_until_synced, + block_height, + ); + let timer = DeadlineClock::new(); + + let initial_sync_state = + SyncState::from_config(min_connected_reserved_peers, time_until_synced); + + let (state_sender, state_receiver) = + tokio::sync::watch::channel(initial_sync_state); + + Self { + peer_connections_stream, + min_connected_reserved_peers, + time_until_synced, + block_stream, + state_sender, + state_receiver, + inner_state, + timer, + } + } + + fn update_sync_state(&mut self, new_state: SyncState) { + self.state_sender + .send_if_modified(|sync_state: &mut SyncState| { + if new_state == *sync_state { + false + } else { + *sync_state = new_state; + true + } + }); + } + + async fn restart_timer(&mut self) { + self.timer + .set_timeout(self.time_until_synced, OnConflict::Overwrite) + .await; + } +} + +#[async_trait::async_trait] +impl RunnableService for SyncTask { + const NAME: &'static str = "fuel-core-consensus/poa/sync-task"; + + type SharedData = watch::Receiver; + type TaskParams = (); + + type Task = SyncTask; + + fn shared_data(&self) -> Self::SharedData { + self.state_receiver.clone() + } + + async fn into_task( + self, + _: &StateWatcher, + _: Self::TaskParams, + ) -> anyhow::Result { + Ok(self) + } +} + +#[async_trait::async_trait] +impl RunnableTask for SyncTask { + #[tracing::instrument(level = "debug", skip_all, err, ret)] + async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let mut should_continue = true; + + tokio::select! { + _ = watcher.while_started() => { + should_continue = false; + } + Some(latest_peer_count) = self.peer_connections_stream.next() => { + let sufficient_peers = latest_peer_count >= self.min_connected_reserved_peers; + + match self.inner_state { + InnerSyncState::InsufficientPeers(block_height) if sufficient_peers => { + self.inner_state = InnerSyncState::SufficientPeers(block_height); + self.restart_timer().await; + } + InnerSyncState::SufficientPeers(block_height) if !sufficient_peers => { + self.inner_state = InnerSyncState::InsufficientPeers(block_height); + self.timer.clear().await; + } + InnerSyncState::Synced { block_height, .. } => { + self.inner_state = InnerSyncState::Synced { block_height, has_sufficient_peers: sufficient_peers }; + } + _ => {}, + } + } + Some(block_info) = self.block_stream.next() => { + let new_block_height = block_info.height; + + match self.inner_state { + InnerSyncState::InsufficientPeers(block_height) if new_block_height > block_height => { + self.inner_state = InnerSyncState::InsufficientPeers(new_block_height); + } + InnerSyncState::SufficientPeers(block_height) if new_block_height > block_height => { + self.inner_state = InnerSyncState::SufficientPeers(new_block_height); + self.restart_timer().await; + } + InnerSyncState::Synced { block_height, has_sufficient_peers } if new_block_height > block_height => { + if block_info.is_locally_produced() { + self.inner_state = InnerSyncState::Synced { block_height: new_block_height, has_sufficient_peers }; + } else { + // we considered to be synced but we're obviously not! + if has_sufficient_peers { + self.inner_state = InnerSyncState::SufficientPeers(new_block_height); + self.restart_timer().await; + } else { + self.inner_state = InnerSyncState::InsufficientPeers(new_block_height); + } + + self.update_sync_state(SyncState::NotSynced); + } + } + _ => {} + } + } + _ = self.timer.wait() => { + if let InnerSyncState::SufficientPeers(block_height) = self.inner_state { + self.inner_state = InnerSyncState::Synced { block_height, has_sufficient_peers: true }; + self.update_sync_state(SyncState::Synced); + } + } + } + + Ok(should_continue) + } + + async fn shutdown(self) -> anyhow::Result<()> { + // Nothing to shut down because we don't have any temporary state that should be dumped, + // and we don't spawn any sub-tasks that we need to finish or await. + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +enum InnerSyncState { + /// We are not connected to at least `min_connected_reserved_peers` peers. + /// + /// InsufficientPeers -> SufficientPeers + InsufficientPeers(BlockHeight), + /// We are connected to at least `min_connected_reserved_peers` peers. + /// + /// SufficientPeers -> Synced(...) + /// SufficientPeers -> InsufficientPeers(...) + SufficientPeers(BlockHeight), + /// We can go into this state if we didn't receive any notification + /// about new block height from the network for `time_until_synced` timeout. + /// + /// We can leave this state only in the case, if we received a valid block + /// from the network with higher block height. + /// + /// Synced -> either InsufficientPeers(...) or SufficientPeers(...) + Synced { + block_height: BlockHeight, + has_sufficient_peers: bool, + }, +} + +impl InnerSyncState { + fn from_config( + min_connected_reserved_peers: usize, + time_until_synced: Duration, + block_height: BlockHeight, + ) -> Self { + match (min_connected_reserved_peers, time_until_synced) { + (0, Duration::ZERO) => InnerSyncState::Synced { + block_height, + has_sufficient_peers: true, + }, + (0, _) => InnerSyncState::SufficientPeers(block_height), + _ => InnerSyncState::InsufficientPeers(block_height), + } + } + + #[cfg(test)] + fn block_height(&self) -> &BlockHeight { + match self { + InnerSyncState::InsufficientPeers(block_height) => block_height, + InnerSyncState::SufficientPeers(block_height) => block_height, + InnerSyncState::Synced { block_height, .. } => block_height, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ + collections::VecDeque, + pin::Pin, + task::{ + Context, + Poll, + }, + time::Duration, + }; + + use fuel_core_services::stream::IntoBoxStream; + + struct MockStream { + items: VecDeque, + } + + impl MockStream { + fn new(range: impl IntoIterator) -> Self { + Self { + items: range.into_iter().collect(), + } + } + } + + impl tokio_stream::Stream for MockStream + where + T: Unpin, + { + type Item = T; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + if this.items.is_empty() { + Poll::Pending + } else { + let next_item = this.items.pop_front(); + Poll::Ready(next_item) + } + } + } + + #[tokio::test] + async fn test_sync_task() { + // given the following config + let connected_peers_report = 5; + let amount_of_updates_from_stream = 1; + let min_connected_reserved_peers = 5; + let biggest_block = 5; + let time_until_synced = Duration::from_secs(5); + + let connections_stream = + MockStream::new(vec![connected_peers_report; amount_of_updates_from_stream]) + .into_boxed(); + let block_stream = MockStream::new((1..biggest_block + 1).map(BlockHeight::from)) + .map(BlockImportInfo::from) + .into_boxed(); + + // and the SyncTask + let mut sync_task = SyncTask::new( + connections_stream, + min_connected_reserved_peers, + time_until_synced, + block_stream, + BlockHeight::default(), + ); + + // sync state should be NotSynced at the beginning + assert_eq!(SyncState::NotSynced, *sync_task.state_receiver.borrow()); + // we should have insufficient peers + assert!(matches!( + sync_task.inner_state, + InnerSyncState::InsufficientPeers(_) + )); + + let (_tx, shutdown) = + tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + + // given that we've performed a `run()` `amount_of_updates_from_stream + biggest_block` times + let run_times = amount_of_updates_from_stream + biggest_block as usize; + for _ in 0..run_times { + let _ = sync_task.run(&mut watcher).await; + } + + // the state should still be NotSynced + assert_eq!(SyncState::NotSynced, *sync_task.state_receiver.borrow()); + + // but we should have sufficient peers + assert!(matches!( + sync_task.inner_state, + InnerSyncState::SufficientPeers(_) + )); + + // and the block should be the latest one + assert_eq!( + sync_task.inner_state.block_height(), + &BlockHeight::from(biggest_block) + ); + + // given that we now run the task again + // both block stream and p2p connected peers updates stream would be empty + // hence the timeout should activate and expire + let _ = sync_task.run(&mut watcher).await; + + // at that point we should be in Synced state + assert_eq!(SyncState::Synced, *sync_task.state_receiver.borrow()); + + // synced should reflect here as well + assert!(matches!( + sync_task.inner_state, + InnerSyncState::Synced { .. } + )); + + // and the block should be still the latest one + assert_eq!( + sync_task.inner_state.block_height(), + &BlockHeight::from(biggest_block) + ); + } +} diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index 9b73343bb63..39915916f31 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -140,7 +140,7 @@ where /// /// # Concurrency /// - /// Only one commit may be in progress at the time. All other calls will be fail. + /// Only one commit may be in progress at the time. All other calls will fail. /// Returns an error if called while another call is in progress. pub fn commit_result( &self, @@ -297,13 +297,11 @@ where return Err(Error::BlockIdMismatch(sealed_block_id, actual_block_id)) } - let import_result = ImportResult { - sealed_block: Sealed { - entity: block, - consensus, - }, - tx_status, + let sealed_block = Sealed { + entity: block, + consensus, }; + let import_result = ImportResult::new_from_network(sealed_block, tx_status); Ok(Uncommitted::new(import_result, db_tx)) } diff --git a/crates/services/importer/src/importer/test.rs b/crates/services/importer/src/importer/test.rs index 68e76705c23..e2eda1bd940 100644 --- a/crates/services/importer/src/importer/test.rs +++ b/crates/services/importer/src/importer/test.rs @@ -347,10 +347,7 @@ fn commit_result_assert( let expected_to_broadcast = sealed_block.clone(); let importer = Importer::new(Default::default(), underlying_db, (), ()); let uncommitted_result = UncommittedResult::new( - ImportResult { - sealed_block, - tx_status: vec![], - }, + ImportResult::new_from_local(sealed_block, vec![]), StorageTransaction::new(executor_db), ); @@ -401,10 +398,7 @@ fn execute_and_commit_assert( fn commit_result_fail_when_locked() { let importer = Importer::new(Default::default(), MockDatabase::default(), (), ()); let uncommitted_result = UncommittedResult::new( - ImportResult { - sealed_block: Default::default(), - tx_status: vec![], - }, + ImportResult::default(), StorageTransaction::new(MockDatabase::default()), ); diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index c3856dd6bde..632b2351fca 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -1057,7 +1057,7 @@ mod tests { tokio::select! { event_from_first_guarded = first_guarded_node.next_event() => { if let Some(FuelP2PEvent::PeerConnected(peer_id)) = event_from_first_guarded { - if !first_sentry_set.remove(&peer_id) { + if !first_sentry_set.remove(&peer_id) { panic!("The node should only connect to the specified reserved nodes!"); } } @@ -1065,7 +1065,7 @@ mod tests { }, event_from_second_guarded = second_guarded_node.next_event() => { if let Some(FuelP2PEvent::PeerConnected(peer_id)) = event_from_second_guarded { - if !second_sentry_set.remove(&peer_id) { + if !second_sentry_set.remove(&peer_id) { panic!("The node should only connect to the specified reserved nodes!"); } } diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index d9eb082fb98..124487d64f1 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -65,6 +65,7 @@ pub struct PeerManager { reserved_peers: HashSet, connection_state: Arc>, max_non_reserved_peers: usize, + reserved_peers_updates: tokio::sync::broadcast::Sender, } impl PeerManager { @@ -73,6 +74,9 @@ impl PeerManager { connection_state: Arc>, max_non_reserved_peers: usize, ) -> Self { + let (reserved_peers_updates, _) = + tokio::sync::broadcast::channel(1 + reserved_peers.len() * 2); + Self { score_config: ScoreConfig::default(), non_reserved_connected_peers: HashMap::with_capacity(max_non_reserved_peers), @@ -80,9 +84,14 @@ impl PeerManager { reserved_peers, connection_state, max_non_reserved_peers, + reserved_peers_updates, } } + pub fn reserved_peers_updates(&self) -> tokio::sync::broadcast::Sender { + self.reserved_peers_updates.clone() + } + pub fn handle_gossip_score_update( &self, peer_id: PeerId, @@ -211,8 +220,11 @@ impl PeerManager { } false + } else if self.reserved_connected_peers.remove(&peer_id).is_some() { + self.send_reserved_peers_update(); + true } else { - self.reserved_connected_peers.remove(&peer_id).is_some() + false } } @@ -259,6 +271,8 @@ impl PeerManager { } else { self.reserved_connected_peers .insert(*peer_id, PeerInfo::default()); + + self.send_reserved_peers_update(); } self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); @@ -266,6 +280,12 @@ impl PeerManager { false } + fn send_reserved_peers_update(&self) { + let _ = self + .reserved_peers_updates + .send(self.reserved_connected_peers.len()); + } + fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) { let peers = if self.reserved_peers.contains(peer_id) { &mut self.reserved_connected_peers diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3b010c4d17f..344ea46fdd6 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -126,10 +126,14 @@ impl Task { let (request_sender, request_receiver) = mpsc::channel(100); let (tx_broadcast, _) = broadcast::channel(100); let (block_height_broadcast, _) = broadcast::channel(100); + let next_block_height = block_importer.next_block_height(); let max_block_size = config.max_block_size; let p2p_service = FuelP2PService::new(config, PostcardCodec::new(max_block_size)); + let reserved_peers_broadcast = + p2p_service.peer_manager().reserved_peers_updates(); + Self { p2p_service, db, @@ -138,6 +142,7 @@ impl Task { shared: SharedState { request_sender, tx_broadcast, + reserved_peers_broadcast, block_height_broadcast, }, } @@ -323,6 +328,8 @@ where pub struct SharedState { /// Sender of p2p transaction used for subscribing. tx_broadcast: broadcast::Sender, + /// Sender of reserved peers connection updates. + reserved_peers_broadcast: broadcast::Sender, /// Used for communicating with the `Task`. request_sender: mpsc::Sender, /// Sender of p2p blopck height data @@ -440,6 +447,10 @@ impl SharedState { self.block_height_broadcast.subscribe() } + pub fn subscribe_reserved_peers_count(&self) -> broadcast::Receiver { + self.reserved_peers_broadcast.subscribe() + } + pub fn report_peer( &self, peer_id: FuelPeerId, diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index fdefc8fbe47..97c437119d0 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -115,11 +115,9 @@ impl MockImporter { let stream = fuel_core_services::stream::unfold(blocks, |mut blocks| async { let block = blocks.pop(); if let Some(sealed_block) = block { - let result = ImportResult { - sealed_block, - tx_status: vec![], - }; - let result = Arc::new(result); + let result = + Arc::new(ImportResult::new_from_local(sealed_block, vec![])); + Some((result, blocks)) } else { core::future::pending().await diff --git a/crates/types/src/services/block_importer.rs b/crates/types/src/services/block_importer.rs index c31375df8da..f73e8040161 100644 --- a/crates/types/src/services/block_importer.rs +++ b/crates/types/src/services/block_importer.rs @@ -1,5 +1,7 @@ //! Types related to block importer service. +use fuel_vm_private::fuel_types::BlockHeight; + use crate::{ blockchain::SealedBlock, services::{ @@ -14,9 +16,90 @@ pub type UncommittedResult = /// The result of the block import. #[derive(Debug)] +#[cfg_attr(any(test, feature = "test-helpers"), derive(Default))] pub struct ImportResult { /// Imported sealed block. pub sealed_block: SealedBlock, /// The status of the transactions execution included into the block. pub tx_status: Vec, + /// The source producer of the block. + pub source: Source, +} + +/// The source producer of the block. +#[derive(Debug, Clone, Copy, PartialEq, Default)] +pub enum Source { + /// The block was imported from the network. + Network, + /// The block was produced locally. + #[default] + Local, +} + +impl ImportResult { + /// Creates a new `ImportResult` from the local producer. + pub fn new_from_local( + sealed_block: SealedBlock, + tx_status: Vec, + ) -> Self { + Self { + sealed_block, + tx_status, + source: Source::Local, + } + } + + /// Creates a new `ImportResult` from the network. + pub fn new_from_network( + sealed_block: SealedBlock, + tx_status: Vec, + ) -> Self { + Self { + sealed_block, + tx_status, + source: Source::Network, + } + } +} + +/// The block import info. +#[derive(Debug, Clone, Copy, PartialEq, Default)] +pub struct BlockImportInfo { + /// The height of the imported block. + pub height: BlockHeight, + /// The producer of the block. + source: Source, +} + +impl BlockImportInfo { + /// Returns `true` if the block was created locally. + pub fn is_locally_produced(&self) -> bool { + self.source == Source::Local + } + + /// Creates a new `BlockImportInfo` with source from the network. + pub fn new_from_network(height: BlockHeight) -> Self { + Self { + height, + source: Source::Network, + } + } +} + +impl From<&ImportResult> for BlockImportInfo { + fn from(result: &ImportResult) -> Self { + Self { + height: *result.sealed_block.entity.header().height(), + source: result.source, + } + } +} + +impl From for BlockImportInfo { + fn from(height: BlockHeight) -> Self { + Self { + height, + source: Default::default(), + } + } }