Skip to content

Commit

Permalink
sync with peers before producing blocks (FuelLabs#1169)
Browse files Browse the repository at this point in the history
Closes FuelLabs#1120

After refactoring and enabling `TaskParams` for `Services` in order for
them to be turned into `Task` with a `TaskParam`,
I created SyncPort, and added it as a `TaskParam` for `poa`'s Service.

The current implementation is that the `SyncAdapter` only checks for 60
seconds if a new block was imported, if not, it considers the PoA node
to be synced.

Improvement on top of this would be - waiting first to connect to N
amount of Reserved Peers.

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
leviathanbeak and xgreenx authored Jun 9, 2023
1 parent 6405e64 commit e4f5d65
Show file tree
Hide file tree
Showing 23 changed files with 750 additions and 94 deletions.
1 change: 1 addition & 0 deletions bin/e2e-test-client/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use fuel_core::service::{
Config,
FuelService,
};

// Add methods on commands
use fuel_core_e2e_client::config::SuiteConfig;
use std::fs;
Expand Down
6 changes: 1 addition & 5 deletions crates/chain-config/src/config/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use fuel_core_types::{
fuel_crypto::Hasher,
fuel_tx::{
ConsensusParameters,
Input,
UtxoId,
},
fuel_types::{
Expand Down Expand Up @@ -46,7 +45,6 @@ use crate::{
coin::CoinConfig,
state::StateConfig,
},
default_consensus_dev_key,
genesis::GenesisCommitment,
ConsensusConfig,
};
Expand Down Expand Up @@ -81,9 +79,7 @@ impl Default for ChainConfig {
transaction_parameters: ConsensusParameters::DEFAULT,
initial_state: None,
gas_costs: GasCosts::default(),
consensus: ConsensusConfig::PoA {
signing_key: Input::owner(&default_consensus_dev_key().public_key()),
},
consensus: ConsensusConfig::default_poa(),
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion crates/chain-config/src/config/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
use fuel_core_types::fuel_types::Address;
use fuel_core_types::{
fuel_tx::Input,
fuel_types::Address,
};
use serde::{
Deserialize,
Serialize,
};

use crate::default_consensus_dev_key;

#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub enum ConsensusConfig {
PoA { signing_key: Address },
}

impl ConsensusConfig {
pub fn default_poa() -> Self {
ConsensusConfig::PoA {
signing_key: Input::owner(&default_consensus_dev_key().public_key()),
}
}
}
5 changes: 3 additions & 2 deletions crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ mod tests {
i += 1;
}

// current services: graphql, txpool, PoA
#[allow(unused_mut)]
let mut expected_services = 3;

Expand All @@ -320,8 +321,8 @@ mod tests {
// }
#[cfg(feature = "p2p")]
{
// p2p
expected_services += 1;
// p2p & sync
expected_services += 2;
}

// # Dev-note: Update the `expected_services` when we add/remove a new/old service.
Expand Down
46 changes: 41 additions & 5 deletions crates/fuel-core/src/service/adapters/consensus_module/poa.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::ops::Deref;

use crate::{
database::Database,
fuel_core_graphql_api::ports::ConsensusModulePort,
service::adapters::{
BlockImporterAdapter,
BlockProducerAdapter,
P2PAdapter,
PoAAdapter,
TxPoolAdapter,
},
Expand All @@ -12,6 +15,7 @@ use anyhow::anyhow;
use fuel_core_poa::{
ports::{
BlockImporter,
P2pPort,
TransactionPool,
},
service::SharedState,
Expand All @@ -23,12 +27,19 @@ use fuel_core_types::{
fuel_tx::TxId,
fuel_types::BlockHeight,
services::{
block_importer::UncommittedResult as UncommittedImporterResult,
block_importer::{
BlockImportInfo,
UncommittedResult as UncommittedImporterResult,
},
executor::UncommittedResult,
txpool::ArcPoolTx,
},
tai64::Tai64,
};
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};

impl PoAAdapter {
pub fn new(shared_state: Option<SharedState>) -> Self {
Expand Down Expand Up @@ -65,10 +76,6 @@ impl TransactionPool for TxPoolAdapter {
}

fn transaction_status_events(&self) -> BoxStream<TxId> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
Box::pin(
BroadcastStream::new(self.service.new_tx_notification_subscribe())
.filter_map(|result| result.ok()),
Expand Down Expand Up @@ -103,4 +110,33 @@ impl BlockImporter for BlockImporterAdapter {
.commit_result(result)
.map_err(Into::into)
}

fn block_stream(&self) -> BoxStream<BlockImportInfo> {
Box::pin(
BroadcastStream::new(self.block_importer.subscribe())
.filter_map(|result| result.ok())
.map(|r| r.deref().into()),
)
}
}

#[cfg(feature = "p2p")]
impl P2pPort for P2PAdapter {
fn reserved_peers_count(&self) -> BoxStream<usize> {
if let Some(service) = &self.service {
Box::pin(
BroadcastStream::new(service.subscribe_reserved_peers_count())
.filter_map(|result| result.ok()),
)
} else {
Box::pin(tokio_stream::pending())
}
}
}

#[cfg(not(feature = "p2p"))]
impl P2pPort for P2PAdapter {
fn reserved_peers_count(&self) -> BoxStream<usize> {
Box::pin(tokio_stream::pending())
}
}
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl TryFrom<&Config> for fuel_core_poa::Config {
signing_key: config.consensus_key.clone(),
metrics: false,
consensus_params: config.chain_conf.transaction_parameters,
..Default::default()
})
}
}
Expand Down
5 changes: 1 addition & 4 deletions crates/fuel-core/src/service/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ fn import_genesis_block(
(),
);
importer.commit_result(UncommittedImportResult::new(
ImportResult {
sealed_block: block,
tx_status: vec![],
},
ImportResult::new_from_local(block, vec![]),
database_transaction,
))?;
Ok(())
Expand Down
24 changes: 10 additions & 14 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,27 @@ pub fn init_sub_services(
let poa_config: fuel_core_poa::Config = config.try_into()?;
let production_enabled =
!matches!(poa_config.trigger, Trigger::Never) || config.manual_blocks_enabled;

let poa = (production_enabled).then(|| {
fuel_core_poa::new_service(
last_block.header(),
poa_config,
tx_pool_adapter.clone(),
producer_adapter.clone(),
importer_adapter.clone(),
p2p_adapter.clone(),
)
});
let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone()));

#[cfg(feature = "p2p")]
let sync = (!production_enabled)
.then(|| {
fuel_core_sync::service::new_service(
*last_block.header().height(),
p2p_adapter,
importer_adapter.clone(),
verifier,
config.sync,
)
})
.transpose()?;
let sync = fuel_core_sync::service::new_service(
*last_block.header().height(),
p2p_adapter,
importer_adapter.clone(),
verifier,
config.sync,
)?;

// TODO: Figure out on how to move it into `fuel-core-graphql-api`.
let schema = {
Expand Down Expand Up @@ -219,9 +217,7 @@ pub fn init_sub_services(
{
if let Some(network) = network.take() {
services.push(Box::new(network));
if let Some(sync) = sync {
services.push(Box::new(sync));
}
services.push(Box::new(sync));
}
}

Expand Down
18 changes: 17 additions & 1 deletion crates/services/consensus_module/poa/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,29 @@ use fuel_core_types::{
};
use tokio::time::Duration;

#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct Config {
pub trigger: Trigger,
pub block_gas_limit: Word,
pub signing_key: Option<Secret<SecretKeyWrapper>>,
pub metrics: bool,
pub consensus_params: ConsensusParameters,
pub min_connected_reserved_peers: usize,
pub time_until_synced: Duration,
}

impl Default for Config {
fn default() -> Self {
Config {
trigger: Trigger::default(),
block_gas_limit: 0,
signing_key: None,
metrics: false,
consensus_params: ConsensusParameters::default(),
min_connected_reserved_peers: 0,
time_until_synced: Duration::ZERO,
}
}
}

/// Block production trigger for PoA operation
Expand Down
1 change: 1 addition & 0 deletions crates/services/consensus_module/poa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![deny(unused_must_use)]

mod deadline_clock;
mod sync;

#[cfg(test)]
mod service_test;
Expand Down
20 changes: 19 additions & 1 deletion crates/services/consensus_module/poa/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use fuel_core_types::{
Bytes32,
},
services::{
block_importer::UncommittedResult as UncommittedImportResult,
block_importer::{
BlockImportInfo,
UncommittedResult as UncommittedImportResult,
},
executor::UncommittedResult as UncommittedExecutionResult,
txpool::ArcPoolTx,
},
Expand Down Expand Up @@ -58,6 +61,8 @@ pub trait BlockImporter: Send + Sync {
&self,
result: UncommittedImportResult<StorageTransaction<Self::Database>>,
) -> anyhow::Result<()>;

fn block_stream(&self) -> BoxStream<BlockImportInfo>;
}

#[cfg_attr(test, mockall::automock)]
Expand All @@ -83,3 +88,16 @@ pub trait RelayerPort {
max_da_lag: &DaBlockHeight,
) -> anyhow::Result<()>;
}

#[cfg_attr(test, mockall::automock)]
pub trait P2pPort: Send + Sync + 'static {
/// Subscribe to reserved peers connection updates.
fn reserved_peers_count(&self) -> BoxStream<usize>;
}

#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait SyncPort: Send + Sync {
/// await synchronization with the peers
async fn sync_with_peers(&mut self) -> anyhow::Result<()>;
}
Loading

0 comments on commit e4f5d65

Please sign in to comment.