Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Revalidate transactions only on latest best block (#6824)
Browse files Browse the repository at this point in the history
* Revalidate transactions only on latest best block

We should revalidate transactions only on the latest best block and not
on any arbitrary block. The revalidation before failed when there were
multiple blocks on the height given to the revalidation function, but no
block was imported as best block.

* Update test-utils/runtime/transaction-pool/src/lib.rs

Co-authored-by: Jaco Greeff <jacogr@gmail.com>

* Fix tests

* Only process best blocks in the transaction pool

Co-authored-by: Jaco Greeff <jacogr@gmail.com>
  • Loading branch information
bkchr and jacogr authored Aug 7, 2020
1 parent 2e9b63b commit 74804b5
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 206 deletions.
4 changes: 1 addition & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,9 @@ mod tests {

futures::executor::block_on(
service.transaction_pool().maintain(
ChainEvent::NewBlock {
is_new_best: true,
ChainEvent::NewBestBlock {
hash: parent_header.hash(),
tree_route: None,
header: parent_header.clone(),
},
)
);
Expand Down
20 changes: 12 additions & 8 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! A set of APIs supported by the client along with their primitives.
use std::{fmt, collections::HashSet, sync::Arc};
use std::{fmt, collections::HashSet, sync::Arc, convert::TryFrom};
use sp_core::storage::StorageKey;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand Down Expand Up @@ -252,13 +252,17 @@ pub struct FinalityNotification<Block: BlockT> {
pub header: Block::Header,
}

impl<B: BlockT> From<BlockImportNotification<B>> for sp_transaction_pool::ChainEvent<B> {
fn from(n: BlockImportNotification<B>) -> Self {
Self::NewBlock {
is_new_best: n.is_new_best,
hash: n.hash,
header: n.header,
tree_route: n.tree_route,
impl<B: BlockT> TryFrom<BlockImportNotification<B>> for sp_transaction_pool::ChainEvent<B> {
type Error = ();

fn try_from(n: BlockImportNotification<B>) -> Result<Self, ()> {
if n.is_new_best {
Ok(Self::NewBestBlock {
hash: n.hash,
tree_route: n.tree_route,
})
} else {
Err(())
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,9 @@ mod tests {
fn chain_event<B: BlockT>(header: B::Header) -> ChainEvent<B>
where NumberFor<B>: From<u64>
{
ChainEvent::NewBlock {
ChainEvent::NewBestBlock {
hash: header.hash(),
tree_route: None,
is_new_best: true,
header,
}
}

Expand Down
14 changes: 7 additions & 7 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
use sp_consensus::ImportedAux;
use sp_inherents::InherentDataProviders;
use sc_basic_authorship::ProposerFactory;
use sc_client_api::BlockBackend;

fn api() -> Arc<TestApi> {
Arc::new(TestApi::empty())
Expand Down Expand Up @@ -415,15 +416,13 @@ mod tests {
}
}
);
// assert that there's a new block in the db.
assert!(client.header(&BlockId::Number(0)).unwrap().is_some());
let block = client.block(&BlockId::Number(1)).unwrap().unwrap().block;
pool_api.add_block(block, true);
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());

let header = client.header(&BlockId::Number(1)).expect("db error").expect("imported above");
pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
pool.maintain(sp_transaction_pool::ChainEvent::NewBestBlock {
hash: header.hash(),
header,
is_new_best: true,
tree_route: None,
}).await;

Expand All @@ -438,10 +437,11 @@ mod tests {
rx1.await.expect("should be no error receiving"),
Ok(_)
);
assert!(client.header(&BlockId::Number(1)).unwrap().is_some());
let block = client.block(&BlockId::Number(2)).unwrap().unwrap().block;
pool_api.add_block(block, true);
pool_api.increment_nonce(Alice.into());

assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok());
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 2)).await.is_ok());
let (tx2, rx2) = futures::channel::oneshot::channel();
assert!(sink.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash),
Expand Down
17 changes: 8 additions & 9 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod testing;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};

use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin};
use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin, convert::TryInto};
use futures::{prelude::*, future::{self, ready}, channel::oneshot};
use parking_lot::Mutex;

Expand Down Expand Up @@ -549,7 +549,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>> {
match event {
ChainEvent::NewBlock { hash, tree_route, is_new_best, .. } => {
ChainEvent::NewBestBlock { hash, tree_route } => {
let pool = self.pool.clone();
let api = self.api.clone();

Expand Down Expand Up @@ -608,10 +608,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
})
}

// If this is a new best block, we need to prune its transactions from the pool.
if is_new_best {
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);
}
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);

metrics.report(
|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64)
Expand Down Expand Up @@ -690,9 +687,9 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
.map(|tx| tx.hash.clone())
.collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
}

revalidation_strategy.lock().clear();
revalidation_strategy.lock().clear();
}
}.boxed()
}
ChainEvent::Finalized { hash } => {
Expand Down Expand Up @@ -721,7 +718,9 @@ pub async fn notification_future<Client, Pool, Block>(
Client: sc_client_api::BlockchainEvents<Block>,
Pool: MaintainedTransactionPool<Block=Block>,
{
let import_stream = client.import_notification_stream().map(Into::into).fuse();
let import_stream = client.import_notification_stream()
.filter_map(|n| ready(n.try_into().ok()))
.fuse();
let finality_stream = client.finality_notification_stream()
.map(Into::into)
.fuse();
Expand Down
21 changes: 12 additions & 9 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
mut self,
from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
interval: R,
) where R: Send, R::Guard: Send
{
) where R: Send, R::Guard: Send {
let interval = interval.into_stream().fuse();
let from_queue = from_queue.fuse();
futures::pin_mut!(interval, from_queue);
Expand Down Expand Up @@ -253,7 +252,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
if this.members.len() > 0 {
log::debug!(
target: "txpool",
"Updated revalidation queue at {}. Transactions: {:?}",
"Updated revalidation queue at {:?}. Transactions: {:?}",
this.best_block,
this.members,
);
Expand Down Expand Up @@ -298,9 +297,7 @@ where
api: Arc<Api>,
pool: Arc<Pool<Api>>,
interval: R,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>)
where R: Send + 'static, R::Guard: Send
{
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>) where R: Send + 'static, R::Guard: Send {
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue");

let worker = RevalidationWorker::new(api.clone(), pool.clone());
Expand Down Expand Up @@ -338,16 +335,22 @@ where
/// If queue configured with background worker, this will return immediately.
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExtrinsicHash<Api>>) {
pub async fn revalidate_later(
&self,
at: NumberFor<Api>,
transactions: Vec<ExtrinsicHash<Api>>,
) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len());
log::debug!(
target: "txpool", "Sent {} transactions to revalidation queue",
transactions.len(),
);
}

if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
}
return;
} else {
let pool = self.pool.clone();
let api = self.api.clone();
Expand Down
Loading

0 comments on commit 74804b5

Please sign in to comment.