diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index fdfa6816296aa..fd2c240a44e3b 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -523,11 +523,9 @@ mod tests { futures::executor::block_on( service.transaction_pool().maintain( - ChainEvent::NewBlock { - is_new_best: true, + ChainEvent::NewBestBlock { hash: parent_header.hash(), tree_route: None, - header: parent_header.clone(), }, ) ); diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 35d40965e6425..f97daa487638f 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -16,7 +16,7 @@ //! A set of APIs supported by the client along with their primitives. -use std::{fmt, collections::HashSet, sync::Arc}; +use std::{fmt, collections::HashSet, sync::Arc, convert::TryFrom}; use sp_core::storage::StorageKey; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, @@ -252,13 +252,17 @@ pub struct FinalityNotification { pub header: Block::Header, } -impl From> for sp_transaction_pool::ChainEvent { - fn from(n: BlockImportNotification) -> Self { - Self::NewBlock { - is_new_best: n.is_new_best, - hash: n.hash, - header: n.header, - tree_route: n.tree_route, +impl TryFrom> for sp_transaction_pool::ChainEvent { + type Error = (); + + fn try_from(n: BlockImportNotification) -> Result { + if n.is_new_best { + Ok(Self::NewBestBlock { + hash: n.hash, + tree_route: n.tree_route, + }) + } else { + Err(()) } } } diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 3c56bdd33db04..41d12970464f4 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -346,11 +346,9 @@ mod tests { fn chain_event(header: B::Header) -> ChainEvent where NumberFor: From { - ChainEvent::NewBlock { + ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None, - is_new_best: true, - header, } } diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 2799a498c1fb8..36aeffd9794f0 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -207,6 +207,7 @@ mod tests { use sp_consensus::ImportedAux; use sp_inherents::InherentDataProviders; use sc_basic_authorship::ProposerFactory; + use sc_client_api::BlockBackend; fn api() -> Arc { Arc::new(TestApi::empty()) @@ -415,15 +416,13 @@ mod tests { } } ); - // assert that there's a new block in the db. - assert!(client.header(&BlockId::Number(0)).unwrap().is_some()); + let block = client.block(&BlockId::Number(1)).unwrap().unwrap().block; + pool_api.add_block(block, true); assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok()); let header = client.header(&BlockId::Number(1)).expect("db error").expect("imported above"); - pool.maintain(sp_transaction_pool::ChainEvent::NewBlock { + pool.maintain(sp_transaction_pool::ChainEvent::NewBestBlock { hash: header.hash(), - header, - is_new_best: true, tree_route: None, }).await; @@ -438,10 +437,11 @@ mod tests { rx1.await.expect("should be no error receiving"), Ok(_) ); - assert!(client.header(&BlockId::Number(1)).unwrap().is_some()); + let block = client.block(&BlockId::Number(2)).unwrap().unwrap().block; + pool_api.add_block(block, true); pool_api.increment_nonce(Alice.into()); - assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok()); + assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 2)).await.is_ok()); let (tx2, rx2) = futures::channel::oneshot::channel(); assert!(sink.send(EngineCommand::SealNewBlock { parent_hash: Some(created_block.hash), diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 6255fd478b756..0b6a1e935b9d0 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -34,7 +34,7 @@ pub mod testing; pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; -use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin}; +use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin, convert::TryInto}; use futures::{prelude::*, future::{self, ready}, channel::oneshot}; use parking_lot::Mutex; @@ -549,7 +549,7 @@ impl MaintainedTransactionPool for BasicPool { fn maintain(&self, event: ChainEvent) -> Pin + Send>> { match event { - ChainEvent::NewBlock { hash, tree_route, is_new_best, .. } => { + ChainEvent::NewBestBlock { hash, tree_route } => { let pool = self.pool.clone(); let api = self.api.clone(); @@ -608,10 +608,7 @@ impl MaintainedTransactionPool for BasicPool }) } - // If this is a new best block, we need to prune its transactions from the pool. - if is_new_best { - pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await); - } + pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await); metrics.report( |metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64) @@ -690,9 +687,9 @@ impl MaintainedTransactionPool for BasicPool .map(|tx| tx.hash.clone()) .collect(); revalidation_queue.revalidate_later(block_number, hashes).await; - } - revalidation_strategy.lock().clear(); + revalidation_strategy.lock().clear(); + } }.boxed() } ChainEvent::Finalized { hash } => { @@ -721,7 +718,9 @@ pub async fn notification_future( Client: sc_client_api::BlockchainEvents, Pool: MaintainedTransactionPool, { - let import_stream = client.import_notification_stream().map(Into::into).fuse(); + let import_stream = client.import_notification_stream() + .filter_map(|n| ready(n.try_into().ok())) + .fuse(); let finality_stream = client.finality_notification_stream() .map(Into::into) .fuse(); diff --git a/client/transaction-pool/src/revalidation.rs b/client/transaction-pool/src/revalidation.rs index af9a76c055b6b..7be8688eaea5d 100644 --- a/client/transaction-pool/src/revalidation.rs +++ b/client/transaction-pool/src/revalidation.rs @@ -211,8 +211,7 @@ impl RevalidationWorker { mut self, from_queue: TracingUnboundedReceiver>, interval: R, - ) where R: Send, R::Guard: Send - { + ) where R: Send, R::Guard: Send { let interval = interval.into_stream().fuse(); let from_queue = from_queue.fuse(); futures::pin_mut!(interval, from_queue); @@ -253,7 +252,7 @@ impl RevalidationWorker { if this.members.len() > 0 { log::debug!( target: "txpool", - "Updated revalidation queue at {}. Transactions: {:?}", + "Updated revalidation queue at {:?}. Transactions: {:?}", this.best_block, this.members, ); @@ -298,9 +297,7 @@ where api: Arc, pool: Arc>, interval: R, - ) -> (Self, Pin + Send>>) - where R: Send + 'static, R::Guard: Send - { + ) -> (Self, Pin + Send>>) where R: Send + 'static, R::Guard: Send { let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue"); let worker = RevalidationWorker::new(api.clone(), pool.clone()); @@ -338,16 +335,22 @@ where /// If queue configured with background worker, this will return immediately. /// If queue configured without background worker, this will resolve after /// revalidation is actually done. - pub async fn revalidate_later(&self, at: NumberFor, transactions: Vec>) { + pub async fn revalidate_later( + &self, + at: NumberFor, + transactions: Vec>, + ) { if transactions.len() > 0 { - log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len()); + log::debug!( + target: "txpool", "Sent {} transactions to revalidation queue", + transactions.len(), + ); } if let Some(ref to_worker) = self.background { if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) { log::warn!(target: "txpool", "Failed to update background worker: {:?}", e); } - return; } else { let pool = self.pool.clone(); let api = self.api.clone(); diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index a938313733ecd..8fa742cd419a3 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -106,7 +106,7 @@ fn prune_tags_should_work() { let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); - pool.validated_pool().api().push_block(1, Vec::new()); + pool.validated_pool().api().push_block(1, Vec::new(), true); block_on( pool.prune_tags( &BlockId::number(1), @@ -141,25 +141,14 @@ fn only_prune_on_new_best() { let uxt = uxt(Alice, 209); let _ = block_on( - pool.submit_and_watch(&BlockId::number(1), SOURCE, uxt.clone()) + pool.submit_and_watch(&BlockId::number(0), SOURCE, uxt.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(1, vec![uxt.clone()]); + pool.api.push_block(1, vec![uxt.clone()], true); assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { + let header = pool.api.push_block(2, vec![uxt], true); + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: false, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); - assert_eq!(pool.status().ready, 1); - - let header = pool.api.push_block(2, vec![uxt]); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; block_on(pool.maintain(event)); @@ -179,7 +168,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { // remove the transaction that just got imported. api.increment_nonce(Alice.into()); - api.push_block(1, Vec::new()); + api.push_block(1, Vec::new(), true); block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); assert_eq!(pool.validated_pool().status().ready, 0); // it's re-imported to future @@ -187,7 +176,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { // so now let's insert another transaction that also provides the 155 api.increment_nonce(Alice.into()); - api.push_block(2, Vec::new()); + api.push_block(2, Vec::new(), true); let xt = uxt(Alice, 211); block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt.clone())).expect("2. Imported"); assert_eq!(pool.validated_pool().status().ready, 1); @@ -197,18 +186,16 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { // prune it and make sure the pool is empty api.increment_nonce(Alice.into()); - api.push_block(3, Vec::new()); + api.push_block(3, Vec::new(), true); block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); assert_eq!(pool.validated_pool().status().ready, 0); assert_eq!(pool.validated_pool().status().future, 2); } fn block_event(header: Header) -> ChainEvent { - ChainEvent::NewBlock { + ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, tree_route: None, - header, } } @@ -219,11 +206,9 @@ fn block_event_with_retracted( ) -> ChainEvent { let tree_route = api.tree_route(retracted_start, header.parent_hash).expect("Tree route exists"); - ChainEvent::NewBlock { + ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, tree_route: Some(Arc::new(tree_route)), - header, } } @@ -236,7 +221,7 @@ fn should_prune_old_during_maintenance() { block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(1, vec![xt.clone()]); + let header = pool.api.push_block(1, vec![xt.clone()], true); block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); @@ -253,7 +238,7 @@ fn should_revalidate_during_maintenance() { assert_eq!(pool.status().ready, 2); assert_eq!(pool.api.validation_requests().len(), 2); - let header = pool.api.push_block(1, vec![xt1.clone()]); + let header = pool.api.push_block(1, vec![xt1.clone()], true); block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 1); @@ -272,8 +257,8 @@ fn should_resubmit_from_retracted_during_maintenance() { block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(1, vec![]); - let fork_header = pool.api.push_block(1, vec![]); + let header = pool.api.push_block(1, vec![], true); + let fork_header = pool.api.push_block(1, vec![], false); let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api); @@ -291,8 +276,8 @@ fn should_not_resubmit_from_retracted_during_maintenance_if_tx_is_also_in_enacte block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(1, vec![xt.clone()]); - let fork_header = pool.api.push_block(1, vec![xt]); + let header = pool.api.push_block(1, vec![xt.clone()], true); + let fork_header = pool.api.push_block(1, vec![xt], false); let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api); @@ -309,8 +294,8 @@ fn should_not_retain_invalid_hashes_from_retracted() { block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(1, vec![]); - let fork_header = pool.api.push_block(1, vec![xt.clone()]); + let header = pool.api.push_block(1, vec![], true); + let fork_header = pool.api.push_block(1, vec![xt.clone()], false); pool.api.add_invalid(&xt); let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api); @@ -330,14 +315,14 @@ fn should_revalidate_transaction_multiple_times() { block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(1, vec![xt.clone()]); + let header = pool.api.push_block(1, vec![xt.clone()], true); block_on(pool.maintain(block_event(header))); block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(2, vec![]); + let header = pool.api.push_block(2, vec![], true); pool.api.add_invalid(&xt); block_on(pool.maintain(block_event(header))); @@ -354,18 +339,18 @@ fn should_revalidate_across_many_blocks() { let (pool, _guard, mut notifier) = maintained_pool(); - block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported"); - block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt2.clone())).expect("1. Imported"); + block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported"); + block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt2.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 2); - let header = pool.api.push_block(1, vec![]); + let header = pool.api.push_block(1, vec![], true); block_on(pool.maintain(block_event(header))); block_on(notifier.next()); - block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt3.clone())).expect("1. Imported"); + block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt3.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 3); - let header = pool.api.push_block(2, vec![xt1.clone()]); + let header = pool.api.push_block(2, vec![xt1.clone()], true); block_on(pool.maintain(block_event(header))); block_on(notifier.next()); @@ -411,7 +396,7 @@ fn should_push_watchers_during_maintaince() { pool.api.add_invalid(&tx4); // clear timer events if any - let header = pool.api.push_block(1, vec![]); + let header = pool.api.push_block(1, vec![], true); block_on(pool.maintain(block_event(header))); block_on(notifier.next()); @@ -429,7 +414,7 @@ fn should_push_watchers_during_maintaince() { ); // when - let header = pool.api.push_block(2, vec![tx0, tx1, tx2]); + let header = pool.api.push_block(2, vec![tx0, tx1, tx2], true); let header_hash = header.hash(); block_on(pool.maintain(block_event(header))); @@ -478,18 +463,16 @@ fn can_track_heap_size() { fn finalization() { let xt = uxt(Alice, 209); let api = TestApi::with_alice_nonce(209); - api.push_block(1, vec![]); + api.push_block(1, vec![], true); let (pool, _background, _) = BasicPool::new_test(api.into()); let watcher = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, xt.clone()) ).expect("1. Imported"); - pool.api.push_block(2, vec![xt.clone()]); + pool.api.push_block(2, vec![xt.clone()], true); - let header = pool.api.chain().read().block_by_number.get(&2).unwrap()[0].header().clone(); - let event = ChainEvent::NewBlock { + let header = pool.api.chain().read().block_by_number.get(&2).unwrap()[0].0.header().clone(); + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; block_on(pool.maintain(event)); @@ -508,7 +491,7 @@ fn finalization() { fn fork_aware_finalization() { let api = TestApi::empty(); // starting block A1 (last finalized.) - api.push_block(1, vec![]); + api.push_block(1, vec![], true); let (pool, _background, _) = BasicPool::new_test(api.into()); let mut canon_watchers = vec![]; @@ -534,14 +517,12 @@ fn fork_aware_finalization() { let watcher = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, from_alice.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(2, vec![from_alice.clone()]); + let header = pool.api.push_block(2, vec![from_alice.clone()], true); canon_watchers.push((watcher, header.hash())); assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; b1 = header.hash(); @@ -553,15 +534,13 @@ fn fork_aware_finalization() { // block C2 { - let header = pool.api.push_block_with_parent(b1, vec![from_dave.clone()]); + let header = pool.api.push_block_with_parent(b1, vec![from_dave.clone()], true); from_dave_watcher = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, from_dave.clone()) ).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; c2 = header.hash(); @@ -575,12 +554,10 @@ fn fork_aware_finalization() { pool.submit_and_watch(&BlockId::number(1), SOURCE, from_bob.clone()) ).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block_with_parent(c2, vec![from_bob.clone()]); + let header = pool.api.push_block_with_parent(c2, vec![from_bob.clone()], true); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; d2 = header.hash(); @@ -594,7 +571,7 @@ fn fork_aware_finalization() { pool.submit_and_watch(&BlockId::number(1), SOURCE, from_charlie.clone()) ).expect("1.Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_block(3, vec![from_charlie.clone()]); + let header = pool.api.push_block(3, vec![from_charlie.clone()], true); canon_watchers.push((watcher, header.hash())); let event = block_event_with_retracted(header.clone(), d2, &*pool.api); @@ -612,13 +589,11 @@ fn fork_aware_finalization() { pool.submit_and_watch(&BlockId::number(1), SOURCE, xt.clone()) ).expect("1. Imported"); assert_eq!(pool.status().ready, 3); - let header = pool.api.push_block(4, vec![xt.clone()]); + let header = pool.api.push_block(4, vec![xt.clone()], true); canon_watchers.push((w, header.hash())); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; d1 = header.hash(); @@ -632,12 +607,10 @@ fn fork_aware_finalization() { // block e1 { - let header = pool.api.push_block(5, vec![from_dave, from_bob]); + let header = pool.api.push_block(5, vec![from_dave, from_bob], true); e1 = header.hash(); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; block_on(pool.maintain(event)); @@ -684,7 +657,7 @@ fn fork_aware_finalization() { fn prune_and_retract_tx_at_same_time() { let api = TestApi::empty(); // starting block A1 (last finalized.) - api.push_block(1, vec![]); + api.push_block(1, vec![], true); let (pool, _background, _) = BasicPool::new_test(api.into()); @@ -697,13 +670,11 @@ fn prune_and_retract_tx_at_same_time() { // Block B1 let b1 = { - let header = pool.api.push_block(2, vec![from_alice.clone()]); + let header = pool.api.push_block(2, vec![from_alice.clone()], true); assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; block_on(pool.maintain(event)); @@ -713,7 +684,7 @@ fn prune_and_retract_tx_at_same_time() { // Block B2 let b2 = { - let header = pool.api.push_block(2, vec![from_alice.clone()]); + let header = pool.api.push_block(2, vec![from_alice.clone()], false); assert_eq!(pool.status().ready, 0); let event = block_event_with_retracted(header.clone(), b1, &*pool.api); @@ -757,7 +728,7 @@ fn prune_and_retract_tx_at_same_time() { fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { let api = TestApi::empty(); // starting block A1 (last finalized.) - api.push_block(1, vec![]); + api.push_block(1, vec![], true); let (pool, _background, _) = BasicPool::new_test(api.into()); @@ -773,13 +744,11 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx0.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(2, vec![tx0.clone()]); + let header = pool.api.push_block(2, vec![tx0.clone()], true); assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { + let event = ChainEvent::NewBestBlock { hash: header.hash(), - is_new_best: true, - header: header.clone(), tree_route: None, }; d0 = header.hash(); @@ -792,23 +761,13 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(2, vec![tx1.clone()]); - assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: false, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); - - // Only transactions from new best should be pruned + pool.api.push_block(2, vec![tx1.clone()], false); assert_eq!(pool.status().ready, 1); } // Block D2 { - let header = pool.api.push_block(2, vec![]); + let header = pool.api.push_block(2, vec![], false); let event = block_event_with_retracted(header, d0, &*pool.api); block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); @@ -819,7 +778,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { fn resubmit_from_retracted_fork() { let api = TestApi::empty(); // starting block A1 (last finalized.) - api.push_block(1, vec![]); + api.push_block(1, vec![], true); let (pool, _background, _) = BasicPool::new_test(api.into()); @@ -844,16 +803,10 @@ fn resubmit_from_retracted_fork() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx0.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(2, vec![tx0.clone()]); + let header = pool.api.push_block(2, vec![tx0.clone()], true); assert_eq!(pool.status().ready, 1); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: true, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); } @@ -862,14 +815,8 @@ fn resubmit_from_retracted_fork() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(3, vec![tx1.clone()]); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: true, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); + let header = pool.api.push_block(3, vec![tx1.clone()], true); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); } @@ -878,14 +825,8 @@ fn resubmit_from_retracted_fork() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx2.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(4, vec![tx2.clone()]); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: true, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); + let header = pool.api.push_block(4, vec![tx2.clone()], true); + block_on(pool.maintain(block_event(header.clone()))); assert_eq!(pool.status().ready, 0); header.hash() }; @@ -895,14 +836,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx3.clone()) ).expect("1. Imported"); - let header = pool.api.push_block(2, vec![tx3.clone()]); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: false, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); + let header = pool.api.push_block(2, vec![tx3.clone()], true); assert_eq!(pool.status().ready, 1); header.hash() }; @@ -912,14 +846,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx4.clone()) ).expect("1. Imported"); - let header = pool.api.push_block_with_parent(d1.clone(), vec![tx4.clone()]); - let event = ChainEvent::NewBlock { - hash: header.hash(), - is_new_best: false, - header: header.clone(), - tree_route: None, - }; - block_on(pool.maintain(event)); + let header = pool.api.push_block_with_parent(d1.clone(), vec![tx4.clone()], true); assert_eq!(pool.status().ready, 2); header.hash() }; @@ -929,7 +856,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, tx5.clone()) ).expect("1. Imported"); - let header = pool.api.push_block_with_parent(e1.clone(), vec![tx5.clone()]); + let header = pool.api.push_block_with_parent(e1.clone(), vec![tx5.clone()], true); // Don't announce the block event to the pool directly, because we will // re-org to this block. assert_eq!(pool.status().ready, 3); @@ -953,7 +880,7 @@ fn resubmit_from_retracted_fork() { fn ready_set_should_not_resolve_before_block_update() { let (pool, _guard, _notifier) = maintained_pool(); let xt1 = uxt(Alice, 209); - block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported"); + block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported"); assert!(pool.ready_at(1).now_or_never().is_none()); } @@ -961,7 +888,7 @@ fn ready_set_should_not_resolve_before_block_update() { #[test] fn ready_set_should_resolve_after_block_update() { let (pool, _guard, _notifier) = maintained_pool(); - let header = pool.api.push_block(1, vec![]); + let header = pool.api.push_block(1, vec![], true); let xt1 = uxt(Alice, 209); @@ -974,7 +901,7 @@ fn ready_set_should_resolve_after_block_update() { #[test] fn ready_set_should_eventually_resolve_when_block_update_arrives() { let (pool, _guard, _notifier) = maintained_pool(); - let header = pool.api.push_block(1, vec![]); + let header = pool.api.push_block(1, vec![], true); let xt1 = uxt(Alice, 209); @@ -1063,7 +990,7 @@ fn import_notification_to_pool_maintain_works() { // Get the notification of the block import and maintain the pool with it, // Now, the pool should not contain any transactions. let evt = import_stream.next().expect("Importing a block leads to an event"); - block_on(pool.maintain(evt.into())); + block_on(pool.maintain(evt.try_into().expect("Imported as new best block"))); assert_eq!(pool.status().ready, 0); } @@ -1075,7 +1002,7 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() { let xt1 = Extrinsic::IncludeData(Vec::new()); block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported"); - let header = pool.api.push_block(1, vec![xt1.clone()]); + let header = pool.api.push_block(1, vec![xt1.clone()], true); // This will prune `xt1`. block_on(pool.maintain(block_event(header))); @@ -1091,3 +1018,23 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() { // returned a second time by the iterator. assert!(iterator.next().is_none()); } + +#[test] +fn only_revalidate_on_best_block() { + let xt = uxt(Alice, 209); + + let (pool, _guard, mut notifier) = maintained_pool(); + + block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); + + let header = pool.api.push_block(1, vec![], true); + + pool.api.push_block(2, vec![], false); + pool.api.push_block(2, vec![], false); + + block_on(pool.maintain(block_event(header))); + block_on(notifier.next()); + + assert_eq!(pool.status().ready, 1); +} diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index 7d1d5537dc9bb..6235ca7cdfcf3 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -248,14 +248,10 @@ pub trait TransactionPool: Send + Sync { /// Events that the transaction pool listens for. pub enum ChainEvent { - /// New blocks have been added to the chain - NewBlock { - /// Is this the new best block. - is_new_best: bool, + /// New best block have been added to the chain + NewBestBlock { /// Hash of the block. hash: B::Hash, - /// Header of the just imported block - header: B::Header, /// Tree route from old best to new best parent that was calculated on import. /// /// If `None`, no re-org happened on import. diff --git a/test-utils/runtime/transaction-pool/src/lib.rs b/test-utils/runtime/transaction-pool/src/lib.rs index 17cecd394ab91..f772ba9b02d5c 100644 --- a/test-utils/runtime/transaction-pool/src/lib.rs +++ b/test-utils/runtime/transaction-pool/src/lib.rs @@ -35,6 +35,7 @@ use substrate_test_runtime_client::{ AccountKeyring::{self, *}, }; use sp_blockchain::CachedHeaderMetadata; +use futures::future::ready; /// Error type used by [`TestApi`]. #[derive(Debug, derive_more::From, derive_more::Display)] @@ -52,9 +53,30 @@ impl std::error::Error for Error { } } +pub enum IsBestBlock { + Yes, + No, +} + +impl IsBestBlock { + pub fn is_best(&self) -> bool { + matches!(self, Self::Yes) + } +} + +impl From for IsBestBlock { + fn from(is_best: bool) -> Self { + if is_best { + Self::Yes + } else { + Self::No + } + } +} + #[derive(Default)] pub struct ChainState { - pub block_by_number: BTreeMap>, + pub block_by_number: BTreeMap>, pub block_by_hash: HashMap, pub nonces: HashMap, pub invalid_hashes: HashSet, @@ -86,7 +108,7 @@ impl TestApi { }; // Push genesis block - api.push_block(0, Vec::new()); + api.push_block(0, Vec::new(), true); api } @@ -97,10 +119,12 @@ impl TestApi { } /// Push block under given number. - /// - /// If multiple blocks exists with the same block number, the first inserted block will be - /// interpreted as part of the canonical chain. - pub fn push_block(&self, block_number: BlockNumber, xts: Vec) -> Header { + pub fn push_block( + &self, + block_number: BlockNumber, + xts: Vec, + is_best_block: bool, + ) -> Header { let parent_hash = { let chain = self.chain.read(); block_number @@ -109,12 +133,12 @@ impl TestApi { chain.block_by_number .get(&num) .map(|blocks| { - blocks[0].header.hash() + blocks[0].0.header.hash() }) }).unwrap_or_default() }; - self.push_block_with_parent(parent_hash, xts) + self.push_block_with_parent(parent_hash, xts, is_best_block) } /// Push a block using the given `parent`. @@ -124,14 +148,14 @@ impl TestApi { &self, parent: Hash, xts: Vec, + is_best_block: bool, ) -> Header { - let mut chain = self.chain.write(); - // `Hash::default()` is the genesis parent hash let block_number = if parent == Hash::default() { 0 } else { - *chain.block_by_hash + *self.chain.read() + .block_by_hash .get(&parent) .expect("`parent` exists") .header() @@ -146,14 +170,21 @@ impl TestApi { state_root: Default::default(), }; - let hash = header.hash(); - let block = Block::new(header.clone(), xts); - chain.block_by_hash.insert(hash, block.clone()); - chain.block_by_number.entry(block_number).or_default().push(block); + self.add_block(Block::new(header.clone(), xts), is_best_block); header } + /// Add a block to the internal state. + pub fn add_block(&self, block: Block, is_best_block: bool) { + let hash = block.header.hash(); + let block_number = block.header.number().clone(); + + let mut chain = self.chain.write(); + chain.block_by_hash.insert(hash, block.clone()); + chain.block_by_number.entry(block_number).or_default().push((block, is_best_block.into())); + } + fn hash_and_length_inner(ex: &Extrinsic) -> (Hash, usize) { let encoded = ex.encode(); (BlakeTwo256::hash(&encoded), encoded.len()) @@ -203,12 +234,36 @@ impl sc_transaction_graph::ChainApi for TestApi { fn validate_transaction( &self, - _at: &BlockId, + at: &BlockId, _source: TransactionSource, uxt: sc_transaction_graph::ExtrinsicFor, ) -> Self::ValidationFuture { self.validation_requests.write().push(uxt.clone()); + match self.block_id_to_number(at) { + Ok(Some(number)) => { + let found_best = self.chain + .read() + .block_by_number + .get(&number) + .map(|blocks| blocks.iter().any(|b| b.1.is_best())) + .unwrap_or(false); + + // If there is no best block, we don't know based on which block we should validate + // the transaction. (This is not required for this test function, but in real + // environment it would fail because of this). + if !found_best { + return ready(Ok( + Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(1)).into()) + )) + } + }, + Ok(None) => return ready(Ok( + Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(2)).into()) + )), + Err(e) => return ready(Err(e)), + } + let (requires, provides) = if let Some(transfer) = uxt.try_transfer() { let chain_nonce = self.chain.read().nonces.get(&transfer.from).cloned().unwrap_or(0); let requires = if chain_nonce == transfer.nonce { @@ -224,7 +279,7 @@ impl sc_transaction_graph::ChainApi for TestApi { }; if self.chain.read().invalid_hashes.contains(&self.hash_and_length(&uxt).0) { - return futures::future::ready(Ok( + return ready(Ok( Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0)).into()) )) } @@ -239,7 +294,7 @@ impl sc_transaction_graph::ChainApi for TestApi { (self.valid_modifier.read())(&mut validity); - futures::future::ready(Ok(Ok(validity))) + ready(Ok(Ok(validity))) } fn block_id_to_number( @@ -266,7 +321,7 @@ impl sc_transaction_graph::ChainApi for TestApi { .read() .block_by_number .get(num) - .map(|blocks| blocks[0].header().hash()), + .and_then(|blocks| blocks.iter().find(|b| b.1.is_best()).map(|b| b.0.header().hash())), }) } @@ -283,7 +338,7 @@ impl sc_transaction_graph::ChainApi for TestApi { .read() .block_by_number .get(num) - .map(|b| b[0].extrinsics().to_vec()), + .map(|b| b[0].0.extrinsics().to_vec()), BlockId::Hash(hash) => self.chain .read() .block_by_hash @@ -332,4 +387,3 @@ pub fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic { let signature = transfer.using_encoded(|e| who.sign(e)).into(); Extrinsic::Transfer { transfer, signature, exhaust_resources_when_not_first: false } } -