Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap sidecare in arcs #11554

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 0 additions & 2 deletions crates/primitives/src/transaction/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ pub use alloy_eips::eip4844::BlobTransactionSidecar;
#[cfg(feature = "c-kzg")]
pub use alloy_eips::eip4844::BlobTransactionValidationError;

use alloc::vec::Vec;

/// A response to `GetPooledTransactions` that includes blob data, their commitments, and their
/// corresponding proofs.
///
Expand Down
42 changes: 26 additions & 16 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl BlobStore for DiskFileBlobStore {
stat
}

fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.inner.get_one(tx)
}

Expand All @@ -115,14 +115,14 @@ impl BlobStore for DiskFileBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
self.inner.get_all(txs)
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl BlobStore for DiskFileBlobStore {

struct DiskFileBlobStoreInner {
blob_dir: PathBuf,
blob_cache: Mutex<LruMap<TxHash, BlobTransactionSidecar, ByLength>>,
blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
size_tracker: BlobStoreSize,
file_lock: RwLock<()>,
txs_to_delete: RwLock<HashSet<B256>>,
Expand Down Expand Up @@ -206,7 +206,7 @@ impl DiskFileBlobStoreInner {
fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut buf = Vec::with_capacity(data.fields_len());
data.encode(&mut buf);
self.blob_cache.lock().insert(tx, data);
self.blob_cache.lock().insert(tx, Arc::new(data));
let size = self.write_one_encoded(tx, &buf)?;

self.size_tracker.add_size(size);
Expand All @@ -228,7 +228,7 @@ impl DiskFileBlobStoreInner {
{
let mut cache = self.blob_cache.lock();
for (tx, data) in txs {
cache.insert(tx, data);
cache.insert(tx, Arc::new(data));
}
}
let mut add = 0;
Expand Down Expand Up @@ -279,15 +279,19 @@ impl DiskFileBlobStoreInner {
}

/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
return Ok(Some(blob.clone()))
}
let blob = self.read_one(tx)?;

if let Some(blob) = &blob {
self.blob_cache.lock().insert(tx, blob.clone());
let blob_arc = Arc::new(blob.clone());
self.blob_cache.lock().insert(tx, blob_arc.clone());
return Ok(Some(blob_arc.clone()))
}
Ok(blob)

Ok(None)
}

/// Returns the path to the blob file for the given transaction hash.
Expand All @@ -312,6 +316,7 @@ impl DiskFileBlobStoreInner {
}
}
};

BlobTransactionSidecar::decode(&mut data.as_slice())
.map(Some)
.map_err(BlobStoreError::DecodeError)
Expand Down Expand Up @@ -375,7 +380,7 @@ impl DiskFileBlobStoreInner {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
let mut cache_miss = Vec::new();
{
Expand All @@ -397,8 +402,9 @@ impl DiskFileBlobStoreInner {
}
let mut cache = self.blob_cache.lock();
for (tx, data) in from_disk {
cache.insert(tx, data.clone());
res.push((tx, data));
let arc = Arc::new(data.clone());
cache.insert(tx, arc.clone());
res.push((tx, arc.clone()));
}

Ok(res)
Expand All @@ -408,7 +414,7 @@ impl DiskFileBlobStoreInner {
///
/// Returns an error if there are any missing blobs.
#[inline]
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
for tx in txs {
let blob = self.get_one(tx)?.ok_or_else(|| BlobStoreError::MissingSidecar(tx))?;
Expand Down Expand Up @@ -491,9 +497,10 @@ pub enum OpenDiskFileBlobStore {

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;

use super::*;

fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
Expand All @@ -519,14 +526,17 @@ mod tests {
let blobs = rng_blobs(10);
let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
store.insert_all(blobs.clone()).unwrap();

// all cached
for (tx, blob) in &blobs {
assert!(store.is_cached(tx));
assert_eq!(store.get(*tx).unwrap().unwrap(), *blob);
let b = (*(store.get(*tx).unwrap().unwrap())).clone();
assert_eq!(b, *blob);
}

let all = store.get_all(all_hashes.clone()).unwrap();
for (tx, blob) in all {
assert!(blobs.contains(&(tx, blob)), "missing blob {tx:?}");
assert!(blobs.contains(&(tx, (*blob).clone())), "missing blob {tx:?}");
}

assert!(store.contains(all_hashes[0]).unwrap());
Expand Down
14 changes: 7 additions & 7 deletions crates/transaction-pool/src/blobstore/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct InMemoryBlobStore {
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<B256, BlobTransactionSidecar>>,
store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
size_tracker: BlobStoreSize,
}

Expand Down Expand Up @@ -75,7 +75,7 @@ impl BlobStore for InMemoryBlobStore {
}

// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let store = self.inner.store.read();
Ok(store.get(&tx).cloned())
}
Expand All @@ -88,7 +88,7 @@ impl BlobStore for InMemoryBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut items = Vec::with_capacity(txs.len());
let store = self.inner.store.read();
for tx in txs {
Expand All @@ -100,7 +100,7 @@ impl BlobStore for InMemoryBlobStore {
Ok(items)
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let mut items = Vec::with_capacity(txs.len());
let store = self.inner.store.read();
for tx in txs {
Expand Down Expand Up @@ -150,7 +150,7 @@ impl BlobStore for InMemoryBlobStore {

/// Removes the given blob from the store and returns the size of the blob that was removed.
#[inline]
fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) -> usize {
fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}

Expand All @@ -159,11 +159,11 @@ fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) ->
/// We don't need to handle the size updates for replacements because transactions are unique.
#[inline]
fn insert_size(
store: &mut HashMap<B256, BlobTransactionSidecar>,
store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
tx: B256,
blob: BlobTransactionSidecar,
) -> usize {
let add = blob.size();
store.insert(tx, blob);
store.insert(tx, Arc::new(blob));
add
}
7 changes: 4 additions & 3 deletions crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
};
use std::sync::Arc;
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};

pub mod disk;
Expand Down Expand Up @@ -44,7 +45,7 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn cleanup(&self) -> BlobStoreCleanupStat;

/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
Expand All @@ -58,13 +59,13 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact
/// order they were requested.
///
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(
Expand Down
7 changes: 4 additions & 3 deletions crates/transaction-pool/src/blobstore/noop.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar};
use alloy_eips::eip4844::BlobAndProofV1;
use alloy_primitives::B256;
Expand Down Expand Up @@ -28,7 +29,7 @@ impl BlobStore for NoopBlobStore {
BlobStoreCleanupStat::default()
}

fn get(&self, _tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, _tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

Expand All @@ -39,11 +40,11 @@ impl BlobStore for NoopBlobStore {
fn get_all(
&self,
_txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(vec![])
}
Expand Down
6 changes: 3 additions & 3 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,21 +547,21 @@ where
self.pool.unique_senders()
}

fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get(tx_hash)
}

fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
self.pool.blob_store().get_all(tx_hashes)
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get_exact(tx_hashes)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-pool/src/maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
.flatten()
.and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction(
tx, sidecar,
tx, (*sidecar).clone(),
)
.ok()
})
Expand Down
6 changes: 3 additions & 3 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,21 @@ impl TransactionPool for NoopTransactionPool {
Default::default()
}

fn get_blob(&self, _tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(&self, _tx_hash: TxHash) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

fn get_all_blobs(
&self,
_tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if tx_hashes.is_empty() {
return Ok(vec![])
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ where
/// Caution: this assumes the given transaction is eip-4844
fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option<BlobTransaction> {
if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, (*sidecar).clone()) {
return Some(blob)
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ pub trait TransactionPool: Send + Sync + Clone {

/// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob
/// store.
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the
/// blob store.
Expand All @@ -440,7 +440,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order
/// they were requested.
Expand All @@ -449,7 +449,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_blobs_for_versioned_hashes(
Expand Down
Loading