Skip to content

Commit

Permalink
Split tx pool and offchain notification handling (paritytech#6231)
Browse files Browse the repository at this point in the history
Instead of having the tx pool and offchain worker being feed from the
same import notification stream, this pr splits them to use two
different streams. The first advantage of this split is that the tx pool
will not be spawned anymore in another task and instead will directly
process the notification in the same task. This has the advantage of
being faster when the system is being under load, as the tx pool will
not be waiting for being scheduled to handle the notification.
  • Loading branch information
bkchr authored Jun 3, 2020
1 parent 2743ed4 commit b4b6ab9
Showing 1 changed file with 56 additions and 45 deletions.
101 changes: 56 additions & 45 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,63 +1025,74 @@ ServiceBuilder<

let spawn_handle = task_manager.spawn_handle();

// Inform the tx pool about imported and finalized blocks.
{
// block notifications
let txpool = Arc::downgrade(&transaction_pool);

let mut import_stream = client.import_notification_stream().map(|n| ChainEvent::NewBlock {
id: BlockId::Hash(n.hash),
header: n.header,
retracted: n.retracted,
is_new_best: n.is_new_best,
}).fuse();
let mut finality_stream = client.finality_notification_stream()
.map(|n| ChainEvent::Finalized::<TBl> { hash: n.hash })
.fuse();

let events = async move {
loop {
let evt = futures::select! {
evt = import_stream.next() => evt,
evt = finality_stream.next() => evt,
complete => return,
};

let txpool = txpool.upgrade();
if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) {
txpool.maintain(evt).await;
}
}
};

spawn_handle.spawn(
"txpool-notifications",
events,
);
}

// Inform the offchain worker about new imported blocks
{
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let notifications_spawn_handle = task_manager.spawn_handle();
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
let is_validator = config.role.is_authority();

let (import_stream, finality_stream) = (
client.import_notification_stream().map(|n| ChainEvent::NewBlock {
id: BlockId::Hash(n.hash),
header: n.header,
retracted: n.retracted,
is_new_best: n.is_new_best,
}),
client.finality_notification_stream().map(|n| ChainEvent::Finalized {
hash: n.hash
})
);
let events = futures::stream::select(import_stream, finality_stream)
.for_each(move |event| {
// offchain worker is only interested in block import events
if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event {
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
match offchain {
Some(offchain) if is_new_best => {
notifications_spawn_handle.spawn(
"offchain-on-block",
offchain.on_block_imported(
&header,
network_state_info.clone(),
is_validator,
),
);
},
Some(_) => log::debug!(
target: "sc_offchain",
"Skipping offchain workers for non-canon block: {:?}",
header,
),
_ => {},
}
};

let txpool = txpool.upgrade();
if let Some(txpool) = txpool.as_ref() {
let events = client.import_notification_stream().for_each(move |n| {
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
match offchain {
Some(offchain) if n.is_new_best => {
notifications_spawn_handle.spawn(
"txpool-maintain",
txpool.maintain(event),
"offchain-on-block",
offchain.on_block_imported(
&n.header,
network_state_info.clone(),
is_validator,
),
);
}
},
Some(_) => log::debug!(
target: "sc_offchain",
"Skipping offchain workers for non-canon block: {:?}",
n.header,
),
_ => {},
}

ready(())
});
ready(())
});

spawn_handle.spawn(
"txpool-and-offchain-notif",
"offchain-notifications",
events,
);
}
Expand Down

0 comments on commit b4b6ab9

Please sign in to comment.