diff --git a/bridges/relays/clients/substrate/src/finality_source.rs b/bridges/relays/clients/substrate/src/finality_source.rs index 18293efa128f9..9b6c0975a45cf 100644 --- a/bridges/relays/clients/substrate/src/finality_source.rs +++ b/bridges/relays/clients/substrate/src/finality_source.rs @@ -98,7 +98,7 @@ where >, P::Header: SourceHeader, { - type FinalityProofsStream = Pin>>>; + type FinalityProofsStream = Pin> + Send>>; async fn best_finalized_block_number(&self) -> Result { // we **CAN** continue to relay finality proofs if source node is out of sync, because diff --git a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs index e703711843a9c..b70decd6f49f9 100644 --- a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs +++ b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs @@ -48,9 +48,7 @@ pub struct EthereumDeployContractParams { } /// Deploy Bridge contract on Ethereum chain. -pub fn run(params: EthereumDeployContractParams) { - let mut local_pool = futures::executor::LocalPool::new(); - +pub async fn run(params: EthereumDeployContractParams) { let EthereumDeployContractParams { eth_params, eth_sign, @@ -61,7 +59,7 @@ pub fn run(params: EthereumDeployContractParams) { eth_contract_code, } = params; - let result = local_pool.run_until(async move { + let result = async move { let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; let sub_client = SubstrateClient::::new(sub_params).await.map_err(RpcError::Substrate)?; @@ -91,7 +89,7 @@ pub fn run(params: EthereumDeployContractParams) { initial_set_id, initial_set, ).await - }); + }.await; if let Err(error) = result { log::error!(target: "bridge", "{}", error); diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index ecd22ab81a1f9..5fed62b9ca686 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -282,19 +282,39 @@ impl TargetClient for SubstrateTransactionsTarget { } /// Relay exchange transaction proof(s) to Substrate node. -pub fn run(params: EthereumExchangeParams) { +pub async fn run(params: EthereumExchangeParams) { match params.mode { - ExchangeRelayMode::Single(eth_tx_hash) => run_single_transaction_relay(params, eth_tx_hash), + ExchangeRelayMode::Single(eth_tx_hash) => { + let result = run_single_transaction_relay(params, eth_tx_hash).await; + match result { + Ok(_) => log::info!( + target: "bridge", + "Ethereum transaction {} proof has been successfully submitted to Substrate node", + eth_tx_hash, + ), + Err(err) => log::error!( + target: "bridge", + "Error submitting Ethereum transaction {} proof to Substrate node: {}", + eth_tx_hash, + err, + ), + } + } ExchangeRelayMode::Auto(eth_start_with_block_number) => { - run_auto_transactions_relay_loop(params, eth_start_with_block_number) + let result = run_auto_transactions_relay_loop(params, eth_start_with_block_number).await; + if let Err(err) = result { + log::error!( + target: "bridge", + "Error auto-relaying Ethereum transactions proofs to Substrate node: {}", + err, + ); + } } - }; + } } /// Run single transaction proof relay and stop. -fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) { - let mut local_pool = futures::executor::LocalPool::new(); - +async fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) -> Result<(), String> { let EthereumExchangeParams { eth_params, sub_params, @@ -303,43 +323,25 @@ fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H25 .. } = params; - let result = local_pool.run_until(async move { - let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; - let sub_client = SubstrateClient::::new(sub_params) - .await - .map_err(RpcError::Substrate)?; - - let source = EthereumTransactionsSource { client: eth_client }; - let target = SubstrateTransactionsTarget { - client: sub_client, - sign_params: sub_sign, - bridge_instance: instance, - }; + let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; + let sub_client = SubstrateClient::::new(sub_params) + .await + .map_err(RpcError::Substrate)?; - relay_single_transaction_proof(&source, &target, eth_tx_hash).await - }); + let source = EthereumTransactionsSource { client: eth_client }; + let target = SubstrateTransactionsTarget { + client: sub_client, + sign_params: sub_sign, + bridge_instance: instance, + }; - match result { - Ok(_) => { - log::info!( - target: "bridge", - "Ethereum transaction {} proof has been successfully submitted to Substrate node", - eth_tx_hash, - ); - } - Err(err) => { - log::error!( - target: "bridge", - "Error submitting Ethereum transaction {} proof to Substrate node: {}", - eth_tx_hash, - err, - ); - } - } + relay_single_transaction_proof(&source, &target, eth_tx_hash).await } -/// Run auto-relay loop. -fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_with_block_number: Option) { +async fn run_auto_transactions_relay_loop( + params: EthereumExchangeParams, + eth_start_with_block_number: Option, +) -> Result<(), String> { let EthereumExchangeParams { eth_params, sub_params, @@ -349,46 +351,41 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi .. } = params; - let do_run_loop = move || -> Result<(), String> { - let eth_client = async_std::task::block_on(EthereumClient::new(eth_params)) - .map_err(|err| format!("Error starting Ethereum client: {:?}", err))?; - let sub_client = async_std::task::block_on(SubstrateClient::::new(sub_params)) - .map_err(|err| format!("Error starting Substrate client: {:?}", err))?; - - let eth_start_with_block_number = match eth_start_with_block_number { - Some(eth_start_with_block_number) => eth_start_with_block_number, - None => { - async_std::task::block_on(sub_client.best_ethereum_finalized_block()) - .map_err(|err| { - format!( - "Error retrieving best finalized Ethereum block from Substrate node: {:?}", - err - ) - })? - .0 - } - }; - - run_loop( - InMemoryStorage::new(eth_start_with_block_number), - EthereumTransactionsSource { client: eth_client }, - SubstrateTransactionsTarget { - client: sub_client, - sign_params: sub_sign, - bridge_instance: instance, - }, - metrics_params, - futures::future::pending(), - ); - - Ok(()) + let eth_client = EthereumClient::new(eth_params) + .await + .map_err(|err| format!("Error starting Ethereum client: {:?}", err))?; + let sub_client = SubstrateClient::::new(sub_params) + .await + .map_err(|err| format!("Error starting Substrate client: {:?}", err))?; + + let eth_start_with_block_number = match eth_start_with_block_number { + Some(eth_start_with_block_number) => eth_start_with_block_number, + None => { + sub_client + .best_ethereum_finalized_block() + .await + .map_err(|err| { + format!( + "Error retrieving best finalized Ethereum block from Substrate node: {:?}", + err + ) + })? + .0 + } }; - if let Err(err) = do_run_loop() { - log::error!( - target: "bridge", - "Error auto-relaying Ethereum transactions proofs to Substrate node: {}", - err, - ); - } + run_loop( + InMemoryStorage::new(eth_start_with_block_number), + EthereumTransactionsSource { client: eth_client }, + SubstrateTransactionsTarget { + client: sub_client, + sign_params: sub_sign, + bridge_instance: instance, + }, + metrics_params, + futures::future::pending(), + ) + .await; + + Ok(()) } diff --git a/bridges/relays/ethereum/src/ethereum_exchange_submit.rs b/bridges/relays/ethereum/src/ethereum_exchange_submit.rs index 8f9f942dac53e..8616cce2166cb 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange_submit.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange_submit.rs @@ -42,9 +42,7 @@ pub struct EthereumExchangeSubmitParams { } /// Submit single Ethereum -> Substrate exchange transaction. -pub fn run(params: EthereumExchangeSubmitParams) { - let mut local_pool = futures::executor::LocalPool::new(); - +pub async fn run(params: EthereumExchangeSubmitParams) { let EthereumExchangeSubmitParams { eth_params, eth_sign, @@ -53,7 +51,7 @@ pub fn run(params: EthereumExchangeSubmitParams) { sub_recipient, } = params; - let result: Result<_, String> = local_pool.run_until(async move { + let result: Result<_, String> = async move { let eth_client = EthereumClient::new(eth_params) .await .map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?; @@ -94,7 +92,8 @@ pub fn run(params: EthereumExchangeSubmitParams) { .map_err(|err| format!("error submitting transaction: {:?}", err))?; Ok(eth_tx_unsigned) - }); + } + .await; match result { Ok(eth_tx_unsigned) => { diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index 2c1abb358c63f..b4fd788f9f674 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -248,7 +248,7 @@ impl TargetClient for SubstrateHeadersTarget { } /// Run Ethereum headers synchronization. -pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { +pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> { let EthereumSyncParams { eth_params, sub_params, @@ -278,7 +278,8 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { sync_params, metrics_params, futures::future::pending(), - ); + ) + .await; Ok(()) } diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index c0c36cc130666..b2080c396f2d7 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -50,6 +50,10 @@ fn main() { let yaml = clap::load_yaml!("cli.yml"); let matches = clap::App::from_yaml(yaml).get_matches(); + async_std::task::block_on(run_command(&matches)); +} + +async fn run_command(matches: &clap::ArgMatches<'_>) { match matches.subcommand() { ("eth-to-sub", Some(eth_to_sub_matches)) => { log::info!(target: "bridge", "Starting ETH ➡ SUB relay."); @@ -60,6 +64,7 @@ fn main() { return; } }) + .await .is_err() { log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync."); @@ -74,6 +79,7 @@ fn main() { return; } }) + .await .is_err() { log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync."); @@ -87,7 +93,8 @@ fn main() { log::error!(target: "bridge", "Error during contract deployment: {}", err); return; } - }); + }) + .await; } ("eth-submit-exchange-tx", Some(eth_exchange_submit_matches)) => { log::info!(target: "bridge", "Submitting ETH ➡ SUB exchange transaction."); @@ -97,7 +104,8 @@ fn main() { log::error!(target: "bridge", "Error submitting Eethereum exchange transaction: {}", err); return; } - }); + }) + .await; } ("eth-exchange-sub", Some(eth_exchange_matches)) => { log::info!(target: "bridge", "Starting ETH ➡ SUB exchange transactions relay."); @@ -107,7 +115,8 @@ fn main() { log::error!(target: "bridge", "Error relaying Ethereum transactions proofs: {}", err); return; } - }); + }) + .await; } ("", _) => { log::error!(target: "bridge", "No subcommand specified"); @@ -398,16 +407,3 @@ fn parse_hex_argument(matches: &clap::ArgMatches, arg: &str) -> Result Ok(None), } } - -#[cfg(test)] -mod tests { - - // Details: https://github.com/paritytech/parity-bridges-common/issues/118 - #[test] - fn async_std_sleep_works() { - let mut local_pool = futures::executor::LocalPool::new(); - local_pool.run_until(async move { - async_std::task::sleep(std::time::Duration::from_secs(1)).await; - }); - } -} diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 5f30e247c5a79..a0ff44d4d9bea 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -163,7 +163,7 @@ impl TargetClient for EthereumHeadersTarget { } /// Run Substrate headers synchronization. -pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { +pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { let SubstrateSyncParams { sub_params, eth_params, @@ -188,7 +188,8 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { sync_params, metrics_params, futures::future::pending(), - ); + ) + .await; Ok(()) } diff --git a/bridges/relays/generic/exchange/src/exchange_loop.rs b/bridges/relays/generic/exchange/src/exchange_loop.rs index 06f4d3f40ab01..f09ad7de41bb5 100644 --- a/bridges/relays/generic/exchange/src/exchange_loop.rs +++ b/bridges/relays/generic/exchange/src/exchange_loop.rs @@ -79,7 +79,7 @@ impl TransactionProofsRelayStorage for InMemoryStorag } /// Run proofs synchronization. -pub fn run( +pub async fn run( storage: impl TransactionProofsRelayStorage>, source_client: impl SourceClient

, target_client: impl TargetClient

, @@ -119,7 +119,8 @@ pub fn run( exit_signal.clone(), ) }, - ); + ) + .await; } /// Run proofs synchronization. diff --git a/bridges/relays/generic/finality/src/finality_loop.rs b/bridges/relays/generic/finality/src/finality_loop.rs index af5da42cee70c..7aafce075e608 100644 --- a/bridges/relays/generic/finality/src/finality_loop.rs +++ b/bridges/relays/generic/finality/src/finality_loop.rs @@ -91,7 +91,7 @@ pub trait TargetClient: RelayClient { } /// Run finality proofs synchronization loop. -pub fn run( +pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, @@ -132,7 +132,8 @@ pub fn run( exit_signal.clone(), ) }, - ); + ) + .await; } /// Unjustified headers container. Ordered by header number. diff --git a/bridges/relays/generic/headers/src/sync_loop.rs b/bridges/relays/generic/headers/src/sync_loop.rs index d2584f2ccb2c8..7da8fd4f42fec 100644 --- a/bridges/relays/generic/headers/src/sync_loop.rs +++ b/bridges/relays/generic/headers/src/sync_loop.rs @@ -112,7 +112,7 @@ impl SyncMaintain

for () {} /// Run headers synchronization. #[allow(clippy::too_many_arguments)] -pub fn run>( +pub async fn run>( source_client: impl SourceClient

, source_tick: Duration, target_client: TC, @@ -159,7 +159,8 @@ pub fn run>( exit_signal.clone(), ) }, - ); + ) + .await; } /// Run headers synchronization. diff --git a/bridges/relays/generic/messages/src/message_lane_loop.rs b/bridges/relays/generic/messages/src/message_lane_loop.rs index 28b55dba47cc7..afbaf7a015a45 100644 --- a/bridges/relays/generic/messages/src/message_lane_loop.rs +++ b/bridges/relays/generic/messages/src/message_lane_loop.rs @@ -206,7 +206,7 @@ pub struct ClientsState { } /// Run message lane service loop. -pub fn run( +pub async fn run( params: Params, source_client: impl SourceClient

, target_client: impl TargetClient

, @@ -251,7 +251,8 @@ pub fn run( exit_signal.clone(), ) }, - ); + ) + .await; } /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. diff --git a/bridges/relays/generic/utils/src/relay_loop.rs b/bridges/relays/generic/utils/src/relay_loop.rs index d750358edaa02..6a61ecd289343 100644 --- a/bridges/relays/generic/utils/src/relay_loop.rs +++ b/bridges/relays/generic/utils/src/relay_loop.rs @@ -37,7 +37,7 @@ pub trait Client: Clone + Send + Sync { /// This function represents an outer loop, which in turn calls provided `loop_run` function to do /// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source, /// target or both) and calls `loop_run` again. -pub fn run( +pub async fn run( reconnect_delay: Duration, mut source_client: SC, mut target_client: TC, @@ -46,50 +46,46 @@ pub fn run( R: Fn(SC, TC) -> F, F: Future>, { - let mut local_pool = futures::executor::LocalPool::new(); + loop { + let result = loop_run(source_client.clone(), target_client.clone()).await; - local_pool.run_until(async move { - loop { - let result = loop_run(source_client.clone(), target_client.clone()).await; - - match result { - Ok(()) => break, - Err(failed_client) => loop { - async_std::task::sleep(reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - match source_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to source client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue; - } + match result { + Ok(()) => break, + Err(failed_client) => loop { + async_std::task::sleep(reconnect_delay).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + match source_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to source client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; } } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - match target_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to target client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue; - } + } + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + match target_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to target client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; } } + } - break; - }, - } - - log::debug!(target: "bridge", "Restarting relay loop"); + break; + }, } - }); + + log::debug!(target: "bridge", "Restarting relay loop"); + } } diff --git a/bridges/relays/substrate/src/finality_pipeline.rs b/bridges/relays/substrate/src/finality_pipeline.rs index 21865b6c4485b..574db6a3f533a 100644 --- a/bridges/relays/substrate/src/finality_pipeline.rs +++ b/bridges/relays/substrate/src/finality_pipeline.rs @@ -126,5 +126,6 @@ pub async fn run( }, metrics_params, futures::future::pending(), - ); + ) + .await; } diff --git a/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs b/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs index 84664e4572bba..08a0fe9c08806 100644 --- a/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs +++ b/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs @@ -125,7 +125,7 @@ type MillauSourceClient = SubstrateMessagesSource; /// Run Millau-to-Rialto messages sync. -pub fn run( +pub async fn run( millau_client: MillauClient, millau_sign: MillauSigningParams, rialto_client: RialtoClient, @@ -185,5 +185,6 @@ pub fn run( RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE), metrics_params, futures::future::pending(), - ); + ) + .await; } diff --git a/bridges/relays/substrate/src/rialto_millau/mod.rs b/bridges/relays/substrate/src/rialto_millau/mod.rs index 72919ccb2422d..45ef7e322af8b 100644 --- a/bridges/relays/substrate/src/rialto_millau/mod.rs +++ b/bridges/relays/substrate/src/rialto_millau/mod.rs @@ -198,7 +198,8 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { rialto_sign, lane.into(), prometheus_params.into(), - ); + ) + .await; } cli::RelayMessages::RialtoToMillau { rialto, @@ -220,7 +221,8 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { millau_sign, lane.into(), prometheus_params.into(), - ); + ) + .await; } } Ok(()) diff --git a/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs b/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs index 3083ce3cbf5d4..b0ce256aa7669 100644 --- a/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs +++ b/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs @@ -125,7 +125,7 @@ type RialtoSourceClient = SubstrateMessagesSource; /// Run Rialto-to-Millau messages sync. -pub fn run( +pub async fn run( rialto_client: RialtoClient, rialto_sign: RialtoSigningParams, millau_client: MillauClient, @@ -184,5 +184,6 @@ pub fn run( MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE), metrics_params, futures::future::pending(), - ); + ) + .await; }