Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot, prune): highest snapshots tracker #4721

Merged
merged 26 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bbb7de2
feat(engine, snapshot): snapshot hook
shekhirin Sep 20, 2023
f0d66cb
handle errors
shekhirin Sep 20, 2023
b516112
add comments
shekhirin Sep 20, 2023
32bf5f2
add tests
shekhirin Sep 20, 2023
7d86f9d
block level snapshotting
shekhirin Sep 21, 2023
ae0bab6
introduce ranges
shekhirin Sep 21, 2023
9e822ea
Merge remote-tracking branch 'origin/main' into alexey/engine-hook-sn…
shekhirin Sep 21, 2023
6199c49
fix tx number ranges
shekhirin Sep 21, 2023
f63dc8e
rethresult & retherror
shekhirin Sep 21, 2023
fc06691
more tests
shekhirin Sep 21, 2023
d986eb8
feat(snapshot, prune): respect highest snapshots in pruner
shekhirin Sep 21, 2023
0565536
better tests
shekhirin Sep 22, 2023
9fa1810
more comments
shekhirin Sep 22, 2023
2195528
respect block interval in targets
shekhirin Sep 27, 2023
1083908
Merge remote-tracking branch 'origin/main' into alexey/engine-hook-sn…
shekhirin Sep 27, 2023
2c2156c
Merge remote-tracking branch 'origin/alexey/engine-hook-snapshot' int…
shekhirin Sep 27, 2023
1ca626a
fix clippy
shekhirin Sep 27, 2023
3c465b7
Merge remote-tracking branch 'origin/alexey/engine-hook-snapshot' int…
shekhirin Sep 27, 2023
aa35d53
don't account for snapshots in receipts
shekhirin Sep 27, 2023
17e030a
init snapshotter in node/mod.rs
shekhirin Sep 27, 2023
fecbfb7
add crate doc, fix ugly Err(match err { ... })
shekhirin Sep 28, 2023
76d90df
Merge remote-tracking branch 'origin/main' into alexey/engine-hook-sn…
shekhirin Sep 28, 2023
54b0ee9
b256
shekhirin Sep 28, 2023
16c611e
Merge remote-tracking branch 'origin/alexey/engine-hook-snapshot' int…
shekhirin Sep 29, 2023
15f33b8
Merge remote-tracking branch 'origin/main' into alexey/pruner-highest…
shekhirin Sep 29, 2023
4d61b7a
fix clippy
shekhirin Sep 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ reth-payload-builder.workspace = true
reth-basic-payload-builder = { path = "../../crates/payload/basic" }
reth-discv4 = { path = "../../crates/net/discv4" }
reth-prune = { path = "../../crates/prune" }
reth-snapshot = { path = "../../crates/snapshot" }
reth-trie = { path = "../../crates/trie" }

# crypto
Expand Down
10 changes: 10 additions & 0 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None
};

let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);

let mut hooks = EngineHooks::new();

let pruner_events = if let Some(prune_config) = prune_config {
Expand All @@ -461,6 +463,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
prune_config.block_interval,
prune_config.parts,
self.chain.prune_batch_sizes,
highest_snapshots_rx,
);
let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
Expand All @@ -469,6 +472,13 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Either::Right(stream::empty())
};

let _snapshotter = reth_snapshot::Snapshotter::new(
db,
self.chain.clone(),
self.chain.snapshot_block_interval,
highest_snapshots_tx,
);

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ where
5,
PruneModes::none(),
PruneBatchSizes::default(),
watch::channel(None).1,
);

let mut hooks = EngineHooks::new();
Expand Down
8 changes: 8 additions & 0 deletions crates/primitives/src/chain/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub static MAINNET: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::mainnet(),
snapshot_block_interval: 500_000,
}
.into()
});
Expand Down Expand Up @@ -107,6 +108,7 @@ pub static GOERLI: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::testnet(),
snapshot_block_interval: 1_000_000,
}
.into()
});
Expand Down Expand Up @@ -153,6 +155,7 @@ pub static SEPOLIA: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::testnet(),
snapshot_block_interval: 1_000_000,
}
.into()
});
Expand Down Expand Up @@ -194,6 +197,7 @@ pub static HOLESKY: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::testnet(),
snapshot_block_interval: 1_000_000,
}
.into()
});
Expand Down Expand Up @@ -303,6 +307,9 @@ pub struct ChainSpec {
/// data coming in.
#[serde(default)]
pub prune_batch_sizes: PruneBatchSizes,

/// The block interval for creating snapshots. Each snapshot will have that much blocks in it.
pub snapshot_block_interval: u64,
}

impl Default for ChainSpec {
Expand All @@ -317,6 +324,7 @@ impl Default for ChainSpec {
deposit_contract: Default::default(),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: Default::default(),
snapshot_block_interval: Default::default(),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/prune/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ reth-primitives.workspace = true
reth-db.workspace = true
reth-provider.workspace = true
reth-interfaces.workspace = true
reth-snapshot = { path = "../snapshot" }

# async
tokio = { workspace = true, features = ["sync"] }

# metrics
reth-metrics.workspace = true
Expand Down
36 changes: 29 additions & 7 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use reth_provider::{
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
TransactionsProvider,
};
use reth_snapshot::HighestSnapshotsTracker;
use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, instrument, trace};
Expand Down Expand Up @@ -48,6 +49,8 @@ pub struct Pruner<DB> {
/// Maximum entries to prune per block, per prune part.
batch_sizes: PruneBatchSizes,
listeners: EventListeners<PrunerEvent>,
#[allow(dead_code)]
highest_snapshots_tracker: HighestSnapshotsTracker,
}

impl<DB: Database> Pruner<DB> {
Expand All @@ -58,6 +61,7 @@ impl<DB: Database> Pruner<DB> {
min_block_interval: usize,
modes: PruneModes,
batch_sizes: PruneBatchSizes,
highest_snapshots_tracker: HighestSnapshotsTracker,
) -> Self {
Self {
metrics: Metrics::default(),
Expand All @@ -67,6 +71,7 @@ impl<DB: Database> Pruner<DB> {
modes,
batch_sizes,
listeners: Default::default(),
highest_snapshots_tracker,
}
}

Expand All @@ -90,9 +95,11 @@ impl<DB: Database> Pruner<DB> {
let provider = self.provider_factory.provider_rw()?;

let mut done = true;

let mut parts_done = BTreeMap::new();

// TODO(alexey): prune snapshot parts of data (headers, transactions)
// let highest_snapshots = *self.highest_snapshots_tracker.borrow();

if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_receipts(tip_block_number)?
{
Expand Down Expand Up @@ -951,24 +958,33 @@ mod tests {
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_stages::test_utils::TestTransaction;
use std::{collections::BTreeMap, ops::AddAssign};
use tokio::sync::watch;

#[test]
fn is_pruning_needed() {
let db = create_test_rw_db();
let pruner =
Pruner::new(db, MAINNET.clone(), 5, PruneModes::none(), PruneBatchSizes::default());
let mut pruner = Pruner::new(
db,
MAINNET.clone(),
5,
PruneModes::none(),
PruneBatchSizes::default(),
watch::channel(None).1,
);

// No last pruned block number was set before
let first_block_number = 1;
assert!(pruner.is_pruning_needed(first_block_number));
pruner.last_pruned_block_number = Some(first_block_number);

// Delta is not less than min block interval
// Tip block number delta is >= than min block interval
let second_block_number = first_block_number + pruner.min_block_interval as u64;
assert!(pruner.is_pruning_needed(second_block_number));
pruner.last_pruned_block_number = Some(second_block_number);

// Delta is less than min block interval
// Tip block number delta is < than min block interval
let third_block_number = second_block_number;
assert!(pruner.is_pruning_needed(third_block_number));
assert!(!pruner.is_pruning_needed(third_block_number));
}

#[test]
Expand Down Expand Up @@ -1006,6 +1022,7 @@ mod tests {
PruneModes { receipts: Some(prune_mode), ..Default::default() },
// Less than total amount of blocks to prune to test the batching logic
PruneBatchSizes::default().with_receipts(10),
watch::channel(None).1,
);

let next_tx_number_to_prune = tx
Expand Down Expand Up @@ -1100,6 +1117,7 @@ mod tests {
PruneModes { transaction_lookup: Some(prune_mode), ..Default::default() },
// Less than total amount of blocks to prune to test the batching logic
PruneBatchSizes::default().with_transaction_lookup(10),
watch::channel(None).1,
);

let next_tx_number_to_prune = tx
Expand Down Expand Up @@ -1197,6 +1215,7 @@ mod tests {
PruneModes { sender_recovery: Some(prune_mode), ..Default::default() },
// Less than total amount of blocks to prune to test the batching logic
PruneBatchSizes::default().with_transaction_senders(10),
watch::channel(None).1,
);

let next_tx_number_to_prune = tx
Expand Down Expand Up @@ -1303,6 +1322,7 @@ mod tests {
PruneModes { account_history: Some(prune_mode), ..Default::default() },
// Less than total amount of blocks to prune to test the batching logic
PruneBatchSizes::default().with_account_history(2000),
watch::channel(None).1,
);

let provider = tx.inner_rw();
Expand Down Expand Up @@ -1433,6 +1453,7 @@ mod tests {
PruneModes { storage_history: Some(prune_mode), ..Default::default() },
// Less than total amount of blocks to prune to test the batching logic
PruneBatchSizes::default().with_storage_history(2000),
watch::channel(None).1,
);

let provider = tx.inner_rw();
Expand Down Expand Up @@ -1574,7 +1595,8 @@ mod tests {
..Default::default()
},
// Less than total amount of blocks to prune to test the batching logic
PruneBatchSizes::default().with_receipts(10),
PruneBatchSizes::default().with_storage_history(10),
watch::channel(None).1,
);

let result = pruner.prune_receipts_by_logs(&provider, tip);
Expand Down
4 changes: 4 additions & 0 deletions crates/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ reth-db.workspace = true
reth-provider.workspace = true
reth-interfaces.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }

# misc
thiserror.workspace = true
tracing.workspace = true

[dev-dependencies]
# reth
Expand Down
5 changes: 4 additions & 1 deletion crates/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ mod error;
mod snapshotter;

pub use error::SnapshotterError;
pub use snapshotter::{SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult};
pub use snapshotter::{
HighestSnapshots, HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult,
SnapshotterWithResult,
};
Loading
Loading