Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(provider): BlockReader::sealed_block_with_senders_range #8750

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/primitives/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ pub struct Header {
pub extra_data: Bytes,
}

impl AsRef<Self> for Header {
fn as_ref(&self) -> &Self {
self
}
}

impl Default for Header {
fn default() -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ where
// Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range.
let blocks = provider.get_take_block_range::<false>(range.clone())?;
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
let blocks = provider.sealed_block_with_senders_range(range.clone())?;
let previous_input = self.post_unwind_commit_input.replace(Chain::new(
blocks,
bundle_state_with_receipts,
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ impl<DB: Database> BlockReader for ProviderFactory<DB> {
) -> ProviderResult<Vec<BlockWithSenders>> {
self.provider()?.block_with_senders_range(range)
}

fn sealed_block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
self.provider()?.sealed_block_with_senders_range(range)
}
}

impl<DB: Database> TransactionsProvider for ProviderFactory<DB> {
Expand Down
195 changes: 130 additions & 65 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ impl<TX: DbTx> DatabaseProvider<TX> {
&self.tx
}

/// Returns a reference to the [`ChainSpec`].
pub fn chain_spec(&self) -> &ChainSpec {
&self.chain_spec
}

/// Return full table as Vec
pub fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DatabaseError>
where
Expand Down Expand Up @@ -353,9 +358,69 @@ impl<TX: DbTx> DatabaseProvider<TX> {
)
}

/// Returns a reference to the [`ChainSpec`].
pub fn chain_spec(&self) -> &ChainSpec {
&self.chain_spec
/// Returns a range of blocks from the database, along with the senders of each
/// transaction in the blocks.
///
/// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
/// construct a block for the provided inputs
fn block_with_senders_range<H, HF, B, BF>(
&self,
headers_range: HF,
assemble_block: BF,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<B>>
where
H: AsRef<Header>,
HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
BF: Fn(
H,
Vec<TransactionSigned>,
Vec<Header>,
Option<Withdrawals>,
Option<Requests>,
Vec<Address>,
) -> ProviderResult<B>,
{
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;

self.process_block_range(
range,
headers_range,
|tx_range, header, ommers, withdrawals, requests| {
let (body, senders) = if tx_range.is_empty() {
(Vec::new(), Vec::new())
} else {
let body = self
.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect::<Vec<TransactionSigned>>();
// fetch senders from the senders table
let known_senders = senders_cursor
.walk_range(tx_range.clone())?
.collect::<Result<HashMap<_, _>, _>>()?;

let mut senders = Vec::with_capacity(body.len());
for (tx_num, tx) in tx_range.zip(body.iter()) {
match known_senders.get(&tx_num) {
None => {
// recover the sender from the transaction if not found
let sender = tx
.recover_signer_unchecked()
.ok_or_else(|| ProviderError::SenderRecoveryError)?;
senders.push(sender);
}
Some(sender) => senders.push(*sender),
}
}

(body, senders)
};

assemble_block(header, body, ommers, withdrawals, requests, senders)
},
)
}
}

Expand Down Expand Up @@ -1317,15 +1382,18 @@ impl<TX: DbTx> BlockNumReader for DatabaseProvider<TX> {
}

impl<Tx: DbTx> DatabaseProvider<Tx> {
fn process_block_range<F, R>(
fn process_block_range<F, H, HF, R>(
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
&self,
range: RangeInclusive<BlockNumber>,
headers_range: HF,
mut assemble_block: F,
) -> ProviderResult<Vec<R>>
where
H: AsRef<Header>,
HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
F: FnMut(
Range<TxNumber>,
Header,
H,
Vec<Header>,
Option<Withdrawals>,
Option<Requests>,
Expand All @@ -1338,53 +1406,59 @@ impl<Tx: DbTx> DatabaseProvider<Tx> {
let len = range.end().saturating_sub(*range.start()) as usize;
let mut blocks = Vec::with_capacity(len);

let headers = self.headers_range(range)?;
let headers = headers_range(range)?;
let mut ommers_cursor = self.tx.cursor_read::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = self.tx.cursor_read::<tables::BlockWithdrawals>()?;
let mut requests_cursor = self.tx.cursor_read::<tables::BlockRequests>()?;
let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;

for header in headers {
let header_ref = header.as_ref();
// If the body indices are not found, this means that the transactions either do
// not exist in the database yet, or they do exit but are
// not indexed. If they exist but are not indexed, we don't
// have enough information to return the block anyways, so
// we skip the block.
if let Some((_, block_body_indices)) = block_body_cursor.seek_exact(header.number)? {
if let Some((_, block_body_indices)) =
block_body_cursor.seek_exact(header_ref.number)?
{
let tx_range = block_body_indices.tx_num_range();

// If we are past shanghai, then all blocks should have a withdrawal list,
// even if empty
let withdrawals =
if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) {
if self.chain_spec.is_shanghai_active_at_timestamp(header_ref.timestamp) {
Some(
withdrawals_cursor
.seek_exact(header.number)?
.seek_exact(header_ref.number)?
.map(|(_, w)| w.withdrawals)
.unwrap_or_default(),
)
} else {
None
};
let requests = if self.chain_spec.is_prague_active_at_timestamp(header.timestamp) {
Some(requests_cursor.seek_exact(header.number)?.unwrap_or_default().1)
} else {
None
};
let requests =
if self.chain_spec.is_prague_active_at_timestamp(header_ref.timestamp) {
Some(requests_cursor.seek_exact(header_ref.number)?.unwrap_or_default().1)
} else {
None
};
let ommers =
if self.chain_spec.final_paris_total_difficulty(header.number).is_some() {
if self.chain_spec.final_paris_total_difficulty(header_ref.number).is_some() {
Vec::new()
} else {
ommers_cursor
.seek_exact(header.number)?
.seek_exact(header_ref.number)?
.map(|(_, o)| o.ommers)
.unwrap_or_default()
};

if let Ok(b) = assemble_block(tx_range, header, ommers, withdrawals, requests) {
blocks.push(b);
}
}
}

Ok(blocks)
}
}
Expand Down Expand Up @@ -1515,62 +1589,53 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {

fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
self.process_block_range(range, |tx_range, header, ommers, withdrawals, requests| {
let body = if tx_range.is_empty() {
Vec::new()
} else {
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect()
};
Ok(Block { header, body, ommers, withdrawals, requests })
})
self.process_block_range(
range,
|range| self.headers_range(range),
|tx_range, header, ommers, withdrawals, requests| {
let body = if tx_range.is_empty() {
Vec::new()
} else {
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect()
};
Ok(Block { header, body, ommers, withdrawals, requests })
},
)
}

fn block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;

self.process_block_range(range, |tx_range, header, ommers, withdrawals, requests| {
let (body, senders) = if tx_range.is_empty() {
(Vec::new(), Vec::new())
} else {
let body = self
.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect::<Vec<TransactionSigned>>();
// fetch senders from the senders table
let known_senders =
senders_cursor
.walk_range(tx_range.clone())?
.collect::<Result<HashMap<_, _>, _>>()?;

let mut senders = Vec::with_capacity(body.len());
for (tx_num, tx) in tx_range.zip(body.iter()) {
match known_senders.get(&tx_num) {
None => {
// recover the sender from the transaction if not found
let sender = tx
.recover_signer_unchecked()
.ok_or_else(|| ProviderError::SenderRecoveryError)?;
senders.push(sender);
}
Some(sender) => senders.push(*sender),
}
}

(body, senders)
};
self.block_with_senders_range(
|range| self.headers_range(range),
|header, body, ommers, withdrawals, requests, senders| {
Block { header, body, ommers, withdrawals, requests }
.try_with_senders_unchecked(senders)
.map_err(|_| ProviderError::SenderRecoveryError)
},
range,
)
}

Block { header, body, ommers, withdrawals, requests }
.try_with_senders_unchecked(senders)
.map_err(|_| ProviderError::SenderRecoveryError)
})
fn sealed_block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
self.block_with_senders_range(
|range| self.sealed_headers_range(range),
|header, body, ommers, withdrawals, requests, senders| {
SealedBlockWithSenders::new(
SealedBlock { header, body, ommers, withdrawals, requests },
senders,
)
.ok_or(ProviderError::SenderRecoveryError)
},
range,
)
}
}

Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ where
) -> ProviderResult<Vec<BlockWithSenders>> {
self.database.block_with_senders_range(range)
}

fn sealed_block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
self.database.sealed_block_with_senders_range(range)
}
}

impl<DB> TransactionsProvider for BlockchainProvider<DB>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,13 @@ impl BlockReader for StaticFileProvider {
) -> ProviderResult<Vec<BlockWithSenders>> {
Err(ProviderError::UnsupportedProvider)
}

fn sealed_block_with_senders_range(
&self,
_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
Err(ProviderError::UnsupportedProvider)
}
}

impl WithdrawalsProvider for StaticFileProvider {
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/test_utils/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ impl BlockReader for MockEthProvider {
) -> ProviderResult<Vec<BlockWithSenders>> {
Ok(vec![])
}

fn sealed_block_with_senders_range(
&self,
_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
Ok(vec![])
}
}

impl BlockReaderIdExt for MockEthProvider {
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ impl BlockReader for NoopProvider {
) -> ProviderResult<Vec<BlockWithSenders>> {
Ok(vec![])
}

fn sealed_block_with_senders_range(
&self,
_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
Ok(vec![])
}
}

impl BlockReaderIdExt for NoopProvider {
Expand Down
11 changes: 8 additions & 3 deletions crates/storage/storage-api/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,19 @@ pub trait BlockReader:
/// Note: returns only available blocks
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>>;

/// retrieves a range of blocks from the database, along with the senders of each
/// Returns a range of blocks from the database, along with the senders of each
/// transaction in the blocks.
///
/// The `transaction_kind` parameter determines whether to return its hash
fn block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>>;

/// Returns a range of sealed blocks from the database, along with the senders of each
/// transaction in the blocks.
fn sealed_block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>>;
}

/// Trait extension for `BlockReader`, for types that implement `BlockId` conversion.
Expand Down
Loading