Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine, snapshot): snapshot hook #4690

Merged
merged 16 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"crates/rpc/rpc-engine-api",
"crates/rpc/rpc-types",
"crates/rpc/rpc-testing-util",
"crates/snapshot",
"crates/stages",
"crates/storage/codecs",
"crates/storage/db",
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-rpc-types.workspace = true
reth-tasks.workspace = true
reth-payload-builder.workspace = true
reth-prune = { path = "../../prune" }
reth-snapshot = { path = "../../snapshot" }
reth-rpc-types-compat.workspace = true
# async
tokio = { workspace = true, features = ["sync"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl EngineHooksController {
) -> Poll<Result<PolledHook, EngineHookError>> {
let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending };

match hook.poll(cx, args) {
match hook.poll(cx, args)? {
mattsse marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready((event, action)) => {
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };

Expand Down Expand Up @@ -109,7 +109,7 @@ impl EngineHooksController {
return Poll::Pending
}

if let Poll::Ready((event, action)) = hook.poll(cx, args) {
if let Poll::Ready((event, action)) = hook.poll(cx, args)? {
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };

debug!(
Expand Down
7 changes: 6 additions & 1 deletion crates/consensus/beacon/src/engine/hooks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub(crate) use controller::{EngineHooksController, PolledHook};
mod prune;
pub use prune::PruneHook;

mod snapshot;
pub use snapshot::SnapshotHook;

/// Collection of [engine hooks][`EngineHook`].
#[derive(Default)]
pub struct EngineHooks {
Expand Down Expand Up @@ -41,7 +44,7 @@ pub trait EngineHook: Send + Sync + 'static {
&mut self,
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)>;
) -> Poll<reth_interfaces::Result<(EngineHookEvent, Option<EngineHookAction>)>>;

/// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs.
fn db_access_level(&self) -> EngineHookDBAccessLevel;
Expand All @@ -52,6 +55,8 @@ pub trait EngineHook: Send + Sync + 'static {
pub struct EngineContext {
/// Tip block number.
pub tip_block_number: BlockNumber,
/// Finalized block number, if known.
pub finalized_block_number: Option<BlockNumber>,
}

/// An event emitted when [hook][`EngineHook`] is polled.
Expand Down
13 changes: 7 additions & 6 deletions crates/consensus/beacon/src/engine/hooks/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
fn poll_pruner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)> {
) -> Poll<reth_interfaces::Result<(EngineHookEvent, Option<EngineHookAction>)>> {
let result = match self.pruner_state {
PrunerState::Idle(_) => return Poll::Pending,
PrunerState::Running(ref mut fut) => {
Expand Down Expand Up @@ -73,14 +73,15 @@ impl<DB: Database + 'static> PruneHook<DB> {
}
};

Poll::Ready((event, None))
Poll::Ready(Ok((event, None)))
}

/// This will try to spawn the pruner if it is idle:
/// 1. Check if pruning is needed through [Pruner::is_pruning_needed].
/// 2a. If pruning is needed, pass tip block number to the [Pruner::run] and spawn it in a
/// 2.
/// 1. If pruning is needed, pass tip block number to the [Pruner::run] and spawn it in a
/// separate task. Set pruner state to [PrunerState::Running].
/// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle].
/// 2. If pruning is not needed, set pruner state back to [PrunerState::Idle].
///
/// If pruner is already running, do nothing.
fn try_spawn_pruner(
Expand Down Expand Up @@ -131,11 +132,11 @@ impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
&mut self,
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)> {
) -> Poll<reth_interfaces::Result<(EngineHookEvent, Option<EngineHookAction>)>> {
// Try to spawn a pruner
match self.try_spawn_pruner(ctx.tip_block_number) {
Some((EngineHookEvent::NotReady, _)) => return Poll::Pending,
Some((event, action)) => return Poll::Ready((event, action)),
Some((event, action)) => return Poll::Ready(Ok((event, action))),
None => (),
}

Expand Down
158 changes: 158 additions & 0 deletions crates/consensus/beacon/src/engine/hooks/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//! Snapshot hook for the engine implementation.

use crate::{
engine::hooks::{
EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHookEvent,
},
hooks::EngineHookDBAccessLevel,
};
use futures::FutureExt;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
use reth_snapshot::{Snapshotter, SnapshotterError, SnapshotterWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;

/// Manages snapshotting under the control of the engine.
///
/// This type controls the [Snapshotter].
pub struct SnapshotHook<DB> {
/// The current state of the snapshotter.
state: SnapshotterState<DB>,
/// The type that can spawn the snapshotter task.
task_spawner: Box<dyn TaskSpawner>,
}

impl<DB: Database + 'static> SnapshotHook<DB> {
/// Create a new instance
pub fn new(snapshotter: Snapshotter<DB>, task_spawner: Box<dyn TaskSpawner>) -> Self {
Self { state: SnapshotterState::Idle(Some(snapshotter)), task_spawner }
}

/// Advances the snapshotter state.
///
/// This checks for the result in the channel, or returns pending if the snapshotter is idle.
fn poll_snapshotter(
&mut self,
cx: &mut Context<'_>,
) -> Poll<reth_interfaces::Result<(EngineHookEvent, Option<EngineHookAction>)>> {
let result = match self.state {
SnapshotterState::Idle(_) => return Poll::Pending,
SnapshotterState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};

let event = match result {
Ok((snapshotter, result)) => {
self.state = SnapshotterState::Idle(Some(snapshotter));

match result {
Ok(_) => EngineHookEvent::Finished(Ok(())),
Err(err) => EngineHookEvent::Finished(Err(match err {
SnapshotterError::InconsistentData(_) => {
EngineHookError::Internal(Box::new(err))
}
SnapshotterError::Interface(err) => err.into(),
SnapshotterError::Database(err) => {
reth_interfaces::Error::Database(err).into()
}
SnapshotterError::Provider(err) => {
reth_interfaces::Error::Provider(err).into()
}
})),
}
}
Err(_) => {
// failed to receive the snapshotter
EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
}
};

Poll::Ready(Ok((event, None)))
}

/// This will try to spawn the snapshotter if it is idle:
/// 1. Check if snapshotting is needed through [Snapshotter::get_snapshot_request] and then
/// [SnapshotRequest::any_some](reth_snapshot::SnapshotRequest::any).
/// 2.
/// 1. If snapshotting is needed, pass snapshot request to the [Snapshotter::run] and spawn
/// it in a separate task. Set snapshotter state to [SnapshotterState::Running].
/// 2. If snapshotting is not needed, set snapshotter state back to
/// [SnapshotterState::Idle].
///
/// If snapshotter is already running, do nothing.
fn try_spawn_snapshotter(
&mut self,
finalized_block_number: BlockNumber,
) -> reth_interfaces::Result<Option<(EngineHookEvent, Option<EngineHookAction>)>> {
Ok(match &mut self.state {
SnapshotterState::Idle(snapshotter) => {
let Some(mut snapshotter) = snapshotter.take() else { return Ok(None) };

let request = snapshotter.get_snapshot_request(finalized_block_number)?;

// Check if the snapshotting of any parts has been requested.
if request.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(
"snapshotter task",
Box::pin(async move {
let result = snapshotter.run(request);
let _ = tx.send((snapshotter, result));
}),
);
self.state = SnapshotterState::Running(rx);

Some((EngineHookEvent::Started, None))
} else {
self.state = SnapshotterState::Idle(Some(snapshotter));
Some((EngineHookEvent::NotReady, None))
}
}
SnapshotterState::Running(_) => None,
})
}
}

impl<DB: Database + 'static> EngineHook for SnapshotHook<DB> {
fn name(&self) -> &'static str {
"Snapshot"
}

fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<reth_interfaces::Result<(EngineHookEvent, Option<EngineHookAction>)>> {
let Some(finalized_block_number) = ctx.finalized_block_number else {
return Poll::Ready(Ok((EngineHookEvent::NotReady, None)))
};

// Try to spawn a snapshotter
match self.try_spawn_snapshotter(finalized_block_number)? {
Some((EngineHookEvent::NotReady, _)) => return Poll::Pending,
Some((event, action)) => return Poll::Ready(Ok((event, action))),
None => (),
}

// Poll snapshotter and check its status
self.poll_snapshotter(cx)
}

fn db_access_level(&self) -> EngineHookDBAccessLevel {
EngineHookDBAccessLevel::ReadOnly
}
}

/// The possible snapshotter states within the sync controller.
///
/// [SnapshotterState::Idle] means that the snapshotter is currently idle.
/// [SnapshotterState::Running] means that the snapshotter is currently running.
enum SnapshotterState<DB> {
/// Snapshotter is idle.
Idle(Option<Snapshotter<DB>>),
/// Snapshotter is running and waiting for a response
Running(oneshot::Receiver<SnapshotterWithResult<DB>>),
}
10 changes: 8 additions & 2 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,10 @@ where
// any engine messages until it's finished.
if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
cx,
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
EngineContext {
tip_block_number: this.blockchain.canonical_tip().number,
finalized_block_number: this.blockchain.finalized_block_number()?,
},
)? {
this.on_hook_result(result)?;
}
Expand Down Expand Up @@ -1796,7 +1799,10 @@ where
if !this.forkchoice_state_tracker.is_latest_invalid() {
if let Poll::Ready(result) = this.hooks.poll_next_hook(
cx,
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
EngineContext {
tip_block_number: this.blockchain.canonical_tip().number,
finalized_block_number: this.blockchain.finalized_block_number()?,
},
this.sync.is_pipeline_active(),
)? {
this.on_hook_result(result)?;
Expand Down
30 changes: 30 additions & 0 deletions crates/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "reth-snapshot"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = """
Snapshotting implementation
"""

[dependencies]
# reth
reth-primitives.workspace = true
reth-db.workspace = true
reth-provider.workspace = true
reth-interfaces.workspace = true

# misc
thiserror.workspace = true

[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { path = "../stages", features = ["test-utils"] }

# misc

assert_matches.workspace = true
18 changes: 18 additions & 0 deletions crates/snapshot/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use reth_db::DatabaseError;
use reth_provider::ProviderError;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum SnapshotterError {
#[error("Inconsistent data: {0}")]
InconsistentData(&'static str),

#[error("An interface error occurred.")]
Interface(#[from] reth_interfaces::Error),

#[error(transparent)]
Database(#[from] DatabaseError),

#[error(transparent)]
Provider(#[from] ProviderError),
}
5 changes: 5 additions & 0 deletions crates/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod error;
mod snapshotter;

pub use error::SnapshotterError;
pub use snapshotter::{SnapshotRequest, Snapshotter, SnapshotterResult, SnapshotterWithResult};
Loading
Loading