Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix quadratic iterations in transaction pool ready set #6256

Merged
merged 9 commits into from
Jun 10, 2020
1 change: 1 addition & 0 deletions client/transaction-pool/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod pool;
mod ready;
mod rotator;
mod validated_pool;
mod tracked_map;

pub mod base_pool;
pub mod watcher;
Expand Down
22 changes: 15 additions & 7 deletions client/transaction-pool/graph/src/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ use std::{

use serde::Serialize;
use log::trace;
use parking_lot::RwLock;
use sp_runtime::traits::Member;
use sp_runtime::transaction_validity::{
TransactionTag as Tag,
};
use sp_transaction_pool::error;

use crate::future::WaitingTransaction;
use crate::base_pool::Transaction;
use crate::{
base_pool::Transaction,
future::WaitingTransaction,
tracked_map::{ReadOnlyTrackedMap, TrackedMap, TrackedSize},
};

/// An in-pool transaction reference.
///
Expand Down Expand Up @@ -113,11 +115,17 @@ pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
/// tags that are provided by Ready transactions
provided_tags: HashMap<Tag, Hash>,
/// Transactions that are ready (i.e. don't have any requirements external to the pool)
ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
ready: TrackedMap<Hash, ReadyTx<Hash, Ex>>,
/// Best transactions that are ready to be included to the block without any other previous transaction.
best: BTreeSet<TransactionRef<Hash, Ex>>,
}

impl<Hash, Ex> TrackedSize for ReadyTx<Hash, Ex> {
fn tracked_size(&self) -> usize {
self.transaction.transaction.bytes
}
}

impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
fn default() -> Self {
ReadyTransactions {
Expand Down Expand Up @@ -468,18 +476,18 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {

/// Returns number of transactions in this queue.
pub fn len(&self) -> usize {
self.ready.read().len()
self.ready.len()
}

/// Returns sum of encoding lengths of all transactions in this queue.
pub fn bytes(&self) -> usize {
self.ready.read().values().fold(0, |acc, tx| acc + tx.transaction.transaction.bytes)
self.ready.bytes()
}
}

/// Iterator of ready transactions ordered by priority.
pub struct BestIterator<Hash, Ex> {
all: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
all: ReadOnlyTrackedMap<Hash, ReadyTx<Hash, Ex>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
Expand Down
189 changes: 189 additions & 0 deletions client/transaction-pool/graph/src/tracked_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// This file is part of Substrate.

// Copyright (C) 2018-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{
collections::HashMap,
sync::{Arc, atomic::{AtomicIsize, Ordering as AtomicOrdering}},
};
use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard};

/// Something that can report it's size.
pub trait TrackedSize {
NikVolf marked this conversation as resolved.
Show resolved Hide resolved
fn tracked_size(&self) -> usize;
}

/// Map with size tracking.
///
/// Size reported might be slightly off and only approximately true.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct TrackedMap<K, V> {
index: Arc<RwLock<HashMap<K, V>>>,
bytes: AtomicIsize,
NikVolf marked this conversation as resolved.
Show resolved Hide resolved
length: AtomicIsize,
}

impl<K, V> Default for TrackedMap<K, V> {
fn default() -> Self {
Self {
index: Arc::new(HashMap::default().into()),
bytes: 0.into(),
length: 0.into(),
}
}
}

impl<K, V> TrackedMap<K, V> {
/// Current tracked length of the content.
pub fn len(&self) -> usize {
std::cmp::max(self.length.load(AtomicOrdering::Relaxed), 0) as usize
}

/// Current sum of content length.
pub fn bytes(&self) -> usize {
std::cmp::max(self.bytes.load(AtomicOrdering::Relaxed), 0) as usize
}

/// Read-only clone of the interior.
pub fn clone(&self) -> ReadOnlyTrackedMap<K, V> {
ReadOnlyTrackedMap(self.index.clone())
}

/// Lock map for read.
pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> {
TrackedMapReadAccess {
inner_guard: self.index.read(),
}
}

/// Lock map for write.
pub fn write<'a>(&'a self) -> TrackedMapWriteAccess<'a, K, V> {
TrackedMapWriteAccess {
inner_guard: self.index.write(),
bytes: &self.bytes,
length: &self.length,
}
}
}

/// Read-only access to map.
///
/// The only thing can be done is .read().
pub struct ReadOnlyTrackedMap<K, V>(Arc<RwLock<HashMap<K, V>>>);

impl<K, V> ReadOnlyTrackedMap<K, V>
where
K: Eq + std::hash::Hash
{
/// Lock map for read.
pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> {
TrackedMapReadAccess {
inner_guard: self.0.read(),
}
}
}

pub struct TrackedMapReadAccess<'a, K, V> {
inner_guard: RwLockReadGuard<'a, HashMap<K, V>>,
}

impl<'a, K, V> TrackedMapReadAccess<'a, K, V>
where
K: Eq + std::hash::Hash
{
/// Returns true if map contains key.
pub fn contains_key(&self, key: &K) -> bool {
self.inner_guard.contains_key(key)
}

/// Returns reference to the contained value by key, if exists.
pub fn get(&self, key: &K) -> Option<&V> {
self.inner_guard.get(key)
}

/// Returns iterator over all values.
pub fn values(&self) -> std::collections::hash_map::Values<K, V> {
self.inner_guard.values()
}
}

pub struct TrackedMapWriteAccess<'a, K, V> {
bytes: &'a AtomicIsize,
length: &'a AtomicIsize,
inner_guard: RwLockWriteGuard<'a, HashMap<K, V>>,
}

impl<'a, K, V> TrackedMapWriteAccess<'a, K, V>
where
K: Eq + std::hash::Hash, V: TrackedSize
{
/// Insert value and return previous (if any).
pub fn insert(&mut self, key: K, val: V) -> Option<V> {
let new_bytes = val.tracked_size();
self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed);
self.length.fetch_add(1, AtomicOrdering::Relaxed);
self.inner_guard.insert(key, val).and_then(|old_val| {
self.bytes.fetch_sub(old_val.tracked_size() as isize, AtomicOrdering::Relaxed);
self.length.fetch_sub(1, AtomicOrdering::Relaxed);
Some(old_val)
})
}

/// Remove value by key.
pub fn remove(&mut self, key: &K) -> Option<V> {
let val = self.inner_guard.remove(key);
if let Some(size) = val.as_ref().map(TrackedSize::tracked_size) {
self.bytes.fetch_sub(size as isize, AtomicOrdering::Relaxed);
self.length.fetch_sub(1, AtomicOrdering::Relaxed);
}
val
}

/// Returns mutable reference to the contained value by key, if exists.
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
self.inner_guard.get_mut(key)
}
}

#[cfg(test)]
mod tests {

use super::*;

impl TrackedSize for i32 {
fn tracked_size(&self) -> usize { *self as usize / 10 }
}

#[test]
fn basic() {
let map = TrackedMap::default();
map.write().insert(5, 10);
map.write().insert(6, 20);

assert_eq!(map.bytes(), 3);
assert_eq!(map.len(), 2);

map.write().insert(6, 30);

assert_eq!(map.bytes(), 4);
assert_eq!(map.len(), 2);

map.write().remove(&6);
assert_eq!(map.bytes(), 1);
assert_eq!(map.len(), 1);
}
}