Skip to content

Commit

Permalink
relay_loop().await from main relay function (paritytech#829)
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik authored and serban300 committed Apr 9, 2024
1 parent 05ab29b commit 2fffbce
Show file tree
Hide file tree
Showing 16 changed files with 163 additions and 166 deletions.
2 changes: 1 addition & 1 deletion bridges/relays/clients/substrate/src/finality_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
>,
P::Header: SourceHeader<C::BlockNumber>,
{
type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>>>>;
type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>> + Send>>;

async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because
Expand Down
8 changes: 3 additions & 5 deletions bridges/relays/ethereum/src/ethereum_deploy_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ pub struct EthereumDeployContractParams {
}

/// Deploy Bridge contract on Ethereum chain.
pub fn run(params: EthereumDeployContractParams) {
let mut local_pool = futures::executor::LocalPool::new();

pub async fn run(params: EthereumDeployContractParams) {
let EthereumDeployContractParams {
eth_params,
eth_sign,
Expand All @@ -61,7 +59,7 @@ pub fn run(params: EthereumDeployContractParams) {
eth_contract_code,
} = params;

let result = local_pool.run_until(async move {
let result = async move {
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await.map_err(RpcError::Substrate)?;

Expand Down Expand Up @@ -91,7 +89,7 @@ pub fn run(params: EthereumDeployContractParams) {
initial_set_id,
initial_set,
).await
});
}.await;

if let Err(error) = result {
log::error!(target: "bridge", "{}", error);
Expand Down
157 changes: 77 additions & 80 deletions bridges/relays/ethereum/src/ethereum_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,39 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
}

/// Relay exchange transaction proof(s) to Substrate node.
pub fn run(params: EthereumExchangeParams) {
pub async fn run(params: EthereumExchangeParams) {
match params.mode {
ExchangeRelayMode::Single(eth_tx_hash) => run_single_transaction_relay(params, eth_tx_hash),
ExchangeRelayMode::Single(eth_tx_hash) => {
let result = run_single_transaction_relay(params, eth_tx_hash).await;
match result {
Ok(_) => log::info!(
target: "bridge",
"Ethereum transaction {} proof has been successfully submitted to Substrate node",
eth_tx_hash,
),
Err(err) => log::error!(
target: "bridge",
"Error submitting Ethereum transaction {} proof to Substrate node: {}",
eth_tx_hash,
err,
),
}
}
ExchangeRelayMode::Auto(eth_start_with_block_number) => {
run_auto_transactions_relay_loop(params, eth_start_with_block_number)
let result = run_auto_transactions_relay_loop(params, eth_start_with_block_number).await;
if let Err(err) = result {
log::error!(
target: "bridge",
"Error auto-relaying Ethereum transactions proofs to Substrate node: {}",
err,
);
}
}
};
}
}

/// Run single transaction proof relay and stop.
fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) {
let mut local_pool = futures::executor::LocalPool::new();

async fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) -> Result<(), String> {
let EthereumExchangeParams {
eth_params,
sub_params,
Expand All @@ -303,43 +323,25 @@ fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H25
..
} = params;

let result = local_pool.run_until(async move {
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params)
.await
.map_err(RpcError::Substrate)?;

let source = EthereumTransactionsSource { client: eth_client };
let target = SubstrateTransactionsTarget {
client: sub_client,
sign_params: sub_sign,
bridge_instance: instance,
};
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params)
.await
.map_err(RpcError::Substrate)?;

relay_single_transaction_proof(&source, &target, eth_tx_hash).await
});
let source = EthereumTransactionsSource { client: eth_client };
let target = SubstrateTransactionsTarget {
client: sub_client,
sign_params: sub_sign,
bridge_instance: instance,
};

match result {
Ok(_) => {
log::info!(
target: "bridge",
"Ethereum transaction {} proof has been successfully submitted to Substrate node",
eth_tx_hash,
);
}
Err(err) => {
log::error!(
target: "bridge",
"Error submitting Ethereum transaction {} proof to Substrate node: {}",
eth_tx_hash,
err,
);
}
}
relay_single_transaction_proof(&source, &target, eth_tx_hash).await
}

/// Run auto-relay loop.
fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_with_block_number: Option<u64>) {
async fn run_auto_transactions_relay_loop(
params: EthereumExchangeParams,
eth_start_with_block_number: Option<u64>,
) -> Result<(), String> {
let EthereumExchangeParams {
eth_params,
sub_params,
Expand All @@ -349,46 +351,41 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi
..
} = params;

let do_run_loop = move || -> Result<(), String> {
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))
.map_err(|err| format!("Error starting Ethereum client: {:?}", err))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))
.map_err(|err| format!("Error starting Substrate client: {:?}", err))?;

let eth_start_with_block_number = match eth_start_with_block_number {
Some(eth_start_with_block_number) => eth_start_with_block_number,
None => {
async_std::task::block_on(sub_client.best_ethereum_finalized_block())
.map_err(|err| {
format!(
"Error retrieving best finalized Ethereum block from Substrate node: {:?}",
err
)
})?
.0
}
};

run_loop(
InMemoryStorage::new(eth_start_with_block_number),
EthereumTransactionsSource { client: eth_client },
SubstrateTransactionsTarget {
client: sub_client,
sign_params: sub_sign,
bridge_instance: instance,
},
metrics_params,
futures::future::pending(),
);

Ok(())
let eth_client = EthereumClient::new(eth_params)
.await
.map_err(|err| format!("Error starting Ethereum client: {:?}", err))?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params)
.await
.map_err(|err| format!("Error starting Substrate client: {:?}", err))?;

let eth_start_with_block_number = match eth_start_with_block_number {
Some(eth_start_with_block_number) => eth_start_with_block_number,
None => {
sub_client
.best_ethereum_finalized_block()
.await
.map_err(|err| {
format!(
"Error retrieving best finalized Ethereum block from Substrate node: {:?}",
err
)
})?
.0
}
};

if let Err(err) = do_run_loop() {
log::error!(
target: "bridge",
"Error auto-relaying Ethereum transactions proofs to Substrate node: {}",
err,
);
}
run_loop(
InMemoryStorage::new(eth_start_with_block_number),
EthereumTransactionsSource { client: eth_client },
SubstrateTransactionsTarget {
client: sub_client,
sign_params: sub_sign,
bridge_instance: instance,
},
metrics_params,
futures::future::pending(),
)
.await;

Ok(())
}
9 changes: 4 additions & 5 deletions bridges/relays/ethereum/src/ethereum_exchange_submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ pub struct EthereumExchangeSubmitParams {
}

/// Submit single Ethereum -> Substrate exchange transaction.
pub fn run(params: EthereumExchangeSubmitParams) {
let mut local_pool = futures::executor::LocalPool::new();

pub async fn run(params: EthereumExchangeSubmitParams) {
let EthereumExchangeSubmitParams {
eth_params,
eth_sign,
Expand All @@ -53,7 +51,7 @@ pub fn run(params: EthereumExchangeSubmitParams) {
sub_recipient,
} = params;

let result: Result<_, String> = local_pool.run_until(async move {
let result: Result<_, String> = async move {
let eth_client = EthereumClient::new(eth_params)
.await
.map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?;
Expand Down Expand Up @@ -94,7 +92,8 @@ pub fn run(params: EthereumExchangeSubmitParams) {
.map_err(|err| format!("error submitting transaction: {:?}", err))?;

Ok(eth_tx_unsigned)
});
}
.await;

match result {
Ok(eth_tx_unsigned) => {
Expand Down
5 changes: 3 additions & 2 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
}

/// Run Ethereum headers synchronization.
pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
let EthereumSyncParams {
eth_params,
sub_params,
Expand Down Expand Up @@ -278,7 +278,8 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
sync_params,
metrics_params,
futures::future::pending(),
);
)
.await;

Ok(())
}
28 changes: 12 additions & 16 deletions bridges/relays/ethereum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ fn main() {

let yaml = clap::load_yaml!("cli.yml");
let matches = clap::App::from_yaml(yaml).get_matches();
async_std::task::block_on(run_command(&matches));
}

async fn run_command(matches: &clap::ArgMatches<'_>) {
match matches.subcommand() {
("eth-to-sub", Some(eth_to_sub_matches)) => {
log::info!(target: "bridge", "Starting ETH ➡ SUB relay.");
Expand All @@ -60,6 +64,7 @@ fn main() {
return;
}
})
.await
.is_err()
{
log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync.");
Expand All @@ -74,6 +79,7 @@ fn main() {
return;
}
})
.await
.is_err()
{
log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync.");
Expand All @@ -87,7 +93,8 @@ fn main() {
log::error!(target: "bridge", "Error during contract deployment: {}", err);
return;
}
});
})
.await;
}
("eth-submit-exchange-tx", Some(eth_exchange_submit_matches)) => {
log::info!(target: "bridge", "Submitting ETH ➡ SUB exchange transaction.");
Expand All @@ -97,7 +104,8 @@ fn main() {
log::error!(target: "bridge", "Error submitting Eethereum exchange transaction: {}", err);
return;
}
});
})
.await;
}
("eth-exchange-sub", Some(eth_exchange_matches)) => {
log::info!(target: "bridge", "Starting ETH ➡ SUB exchange transactions relay.");
Expand All @@ -107,7 +115,8 @@ fn main() {
log::error!(target: "bridge", "Error relaying Ethereum transactions proofs: {}", err);
return;
}
});
})
.await;
}
("", _) => {
log::error!(target: "bridge", "No subcommand specified");
Expand Down Expand Up @@ -398,16 +407,3 @@ fn parse_hex_argument(matches: &clap::ArgMatches, arg: &str) -> Result<Option<Ve
None => Ok(None),
}
}

#[cfg(test)]
mod tests {

// Details: https://github.com/paritytech/parity-bridges-common/issues/118
#[test]
fn async_std_sleep_works() {
let mut local_pool = futures::executor::LocalPool::new();
local_pool.run_until(async move {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
});
}
}
5 changes: 3 additions & 2 deletions bridges/relays/ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
}

/// Run Substrate headers synchronization.
pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
let SubstrateSyncParams {
sub_params,
eth_params,
Expand All @@ -188,7 +188,8 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
sync_params,
metrics_params,
futures::future::pending(),
);
)
.await;

Ok(())
}
5 changes: 3 additions & 2 deletions bridges/relays/generic/exchange/src/exchange_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorag
}

/// Run proofs synchronization.
pub fn run<P: TransactionProofPipeline>(
pub async fn run<P: TransactionProofPipeline>(
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
Expand Down Expand Up @@ -119,7 +119,8 @@ pub fn run<P: TransactionProofPipeline>(
exit_signal.clone(),
)
},
);
)
.await;
}

/// Run proofs synchronization.
Expand Down
Loading

0 comments on commit 2fffbce

Please sign in to comment.