Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Spawn new thread to wait for the target block
Browse files Browse the repository at this point in the history
  • Loading branch information
samelamin committed Dec 4, 2022
1 parent 052b310 commit ee9cd1f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 40 deletions.
121 changes: 81 additions & 40 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ use polkadot_primitives::v2::{
};

use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, future::FutureExt, Future, StreamExt};
use futures::{channel::oneshot, future::FutureExt, select, Future, StreamExt};

use cumulus_client_consensus_common::parachain_consensus::RelaychainClient;
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
use std::{
convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc, thread::sleep, time::Duration,
};

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -465,12 +468,27 @@ impl<Block: BlockT> WaitForParachainTargetBlock<Block> {
pub async fn warp_sync_get(
para_id: ParaId,
relay_chain_interface: Arc<dyn RelayChainInterface>,
spawner: Arc<dyn SpawnNamed + Send + Sync>,
) -> Result<oneshot::Receiver<Block::Header>, BoxedError>
where
Block: BlockT + 'static,
{
let (sender, receiver) = oneshot::channel::<Block::Header>();
Self::wait_for_target_block(sender, para_id, relay_chain_interface).await;

spawner.spawn(
"cumulus-parachain-wait-for-target-block",
None,
async move {
tracing::debug!(
target: LOG_TARGET,
"waiting for target block in a background task...",
);
Self::wait_for_target_block(sender, para_id, relay_chain_interface).await;
tracing::debug!(target: LOG_TARGET, "target block reached",);
}
.boxed(),
);

return Ok(receiver)
}

Expand All @@ -479,46 +497,69 @@ impl<Block: BlockT> WaitForParachainTargetBlock<Block> {
para_id: ParaId,
relay_chain_interface: Arc<dyn RelayChainInterface>,
) {
let is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| {
tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
})
.unwrap_or(false);

let mut import_stream =
relay_chain_interface.import_notification_stream().await.unwrap().fuse();
loop {
if !is_syncing {
let mut finalized_heads = match relay_chain_interface.finalized_heads(para_id).await
{
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: "cumulus-network", "Stopping following finalized head.");
return
};

let target_header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
select! {
i = import_stream.next() => {
match i {
Some(_header) => {
let is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
"Unable to determine sync status. {}",
e
)
})
.unwrap_or(false);

if !is_syncing {
let mut finalized_heads =
match relay_chain_interface.finalized_heads(para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(
target: LOG_TARGET,
"Stopping following finalized head."
);
return
};

let target_header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue
},
};

let _ = sender.send(target_header);
break
}
tracing::debug!(
target: "cumulus-network",
error = ?err,
"Could not decode parachain header while following finalized heads.",
target: LOG_TARGET,
"waiting for relay chain sync to complete......",
);
continue
},
};

let _ = sender.send(target_header);
break
sleep(Duration::from_secs(120));
},
None => ()
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ async fn start_node_impl(
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
id,
relay_chain_interface.clone(),
Arc::new(task_manager.spawn_handle()),
)
.await
{
Expand Down
3 changes: 3 additions & 0 deletions polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ where
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
Arc::new(task_manager.spawn_handle()),
)
.await
{
Expand Down Expand Up @@ -602,6 +603,7 @@ where
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
Arc::new(task_manager.spawn_handle()),
)
.await
{
Expand Down Expand Up @@ -1381,6 +1383,7 @@ where
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
Arc::new(task_manager.spawn_handle()),
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ where
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
Arc::new(task_manager.spawn_handle()),
)
.await
{
Expand Down

0 comments on commit ee9cd1f

Please sign in to comment.