Skip to content

Commit d427361

Browse files
authored
feat: graceful shutdown (#150)
* feat: graceful shutdown * introduce unwind for graceful shutdown * refactor graceful shutdown * implement graceful shutdown test case * clenaup and add test batch data * cleanup * merge diff * use block data bypass * feat: introduce mainnet block data sample for testing * revert unfalized data on startup * add finalized data * cleanup * doc comments * cleanup * address feedback * address feedback
1 parent 8ebccb3 commit d427361

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3115
-401
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ reth-payload-primitives = { git = "https://github.com/scroll-tech/reth.git", def
152152
reth-primitives = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
153153
reth-primitives-traits = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
154154
reth-provider = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
155+
reth-rpc-builder = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
155156
reth-rpc-server-types = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
156157
reth-tasks = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
157158
reth-tokio-util = { git = "https://github.com/scroll-tech/reth.git", default-features = false }

crates/database/db/src/db.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ mod test {
413413
let mut u = Unstructured::new(&bytes);
414414

415415
// Initially should return None
416-
let latest_safe = db.get_latest_safe_l2_block().await.unwrap();
416+
let latest_safe = db.get_latest_safe_l2_info().await.unwrap();
417417
assert!(latest_safe.is_none());
418418

419419
// Generate and insert a batch
@@ -449,8 +449,8 @@ mod test {
449449
.unwrap();
450450

451451
// Should return the highest safe block (block 201)
452-
let latest_safe = db.get_latest_safe_l2_block().await.unwrap();
453-
assert_eq!(latest_safe, Some(safe_block_2));
452+
let latest_safe = db.get_latest_safe_l2_info().await.unwrap();
453+
assert_eq!(latest_safe, Some((safe_block_2, batch_info)));
454454
}
455455

456456
#[tokio::test]

crates/database/db/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ mod models;
1313
pub use models::*;
1414

1515
mod operations;
16-
pub use operations::DatabaseOperations;
16+
pub use operations::{DatabaseOperations, UnwindResult};
1717

1818
mod transaction;
1919
pub use transaction::DatabaseTransaction;

crates/database/db/src/models/batch_commit.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ impl From<Model> for BatchCommitData {
6565
blob_versioned_hash: value
6666
.blob_hash
6767
.map(|b| b.as_slice().try_into().expect("data persisted in database is valid")),
68+
finalized_block_number: value.finalized_block_number.map(|b| b as u64),
6869
}
6970
}
7071
}

crates/database/db/src/models/l2_block.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ impl Model {
1717
pub(crate) fn block_info(&self) -> BlockInfo {
1818
BlockInfo { number: self.block_number as u64, hash: B256::from_slice(&self.block_hash) }
1919
}
20+
21+
pub(crate) fn batch_info(&self) -> Option<BatchInfo> {
22+
self.batch_hash.as_ref().map(|hash| BatchInfo {
23+
index: self.batch_index.expect("batch index must be present if batch hash is present")
24+
as u64,
25+
hash: B256::from_slice(hash),
26+
})
27+
}
2028
}
2129

2230
/// The relation for the batch input model.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use rollup_node_primitives::Metadata;
2+
use sea_orm::{entity::prelude::*, ActiveValue};
3+
4+
/// A database model that represents the metadata for the rollup node.
5+
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
6+
#[sea_orm(table_name = "metadata")]
7+
pub struct Model {
8+
/// The metadata key.
9+
#[sea_orm(primary_key)]
10+
pub key: String,
11+
/// The metadata value.
12+
pub value: String,
13+
}
14+
15+
/// The relation for the metadata model.
16+
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
17+
pub enum Relation {}
18+
19+
/// The active model behavior for the metadata model.
20+
impl ActiveModelBehavior for ActiveModel {}
21+
22+
impl From<Metadata> for ActiveModel {
23+
fn from(metadata: Metadata) -> Self {
24+
Self {
25+
key: ActiveValue::Set("l1_finalized_block".to_owned()),
26+
value: ActiveValue::Set(metadata.l1_finalized_block.to_string()),
27+
}
28+
}
29+
}
30+
31+
impl From<Model> for Metadata {
32+
fn from(value: Model) -> Self {
33+
debug_assert!(value.key == "l1_finalized_block");
34+
Self { l1_finalized_block: value.value.parse().expect("invalid value") }
35+
}
36+
}

crates/database/db/src/models/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ pub mod block_data;
99

1010
/// This module contains the L1 message database model.
1111
pub mod l1_message;
12+
13+
/// This module contains the metadata model.
14+
pub mod metadata;

crates/database/db/src/operations.rs

Lines changed: 172 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::DatabaseConnectionProvider;
33
use alloy_primitives::B256;
44
use futures::{Stream, StreamExt};
55
use rollup_node_primitives::{
6-
BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages,
6+
BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, Metadata,
77
};
88
use scroll_alloy_rpc_types_engine::BlockDataHint;
99
use sea_orm::{
@@ -19,8 +19,20 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
1919
async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> {
2020
tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database.");
2121
let batch_commit: models::batch_commit::ActiveModel = batch_commit.into();
22-
batch_commit.insert(self.get_connection()).await?;
23-
Ok(())
22+
Ok(models::batch_commit::Entity::insert(batch_commit)
23+
.on_conflict(
24+
OnConflict::column(models::batch_commit::Column::Index)
25+
.update_columns(vec![
26+
models::batch_commit::Column::Hash,
27+
models::batch_commit::Column::BlockNumber,
28+
models::batch_commit::Column::BlockTimestamp,
29+
models::batch_commit::Column::FinalizedBlockNumber,
30+
])
31+
.to_owned(),
32+
)
33+
.exec(self.get_connection())
34+
.await
35+
.map(|_| ())?)
2436
}
2537

2638
/// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
@@ -68,6 +80,37 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
6880
.map(|x| x.map(Into::into))?)
6981
}
7082

83+
/// Set the latest finalized L1 block number.
84+
async fn set_latest_finalized_l1_block_number(
85+
&self,
86+
block_number: u64,
87+
) -> Result<(), DatabaseError> {
88+
tracing::trace!(target: "scroll::db", block_number, "Updating the latest finalized L1 block number in the database.");
89+
let metadata: models::metadata::ActiveModel =
90+
Metadata { l1_finalized_block: block_number }.into();
91+
Ok(models::metadata::Entity::insert(metadata)
92+
.on_conflict(
93+
OnConflict::column(models::metadata::Column::Key)
94+
.update_column(models::metadata::Column::Value)
95+
.to_owned(),
96+
)
97+
.exec(self.get_connection())
98+
.await
99+
.map(|_| ())?)
100+
}
101+
102+
/// Get the finalized L1 block number from the database.
103+
async fn get_finalized_l1_block_number(&self) -> Result<Option<u64>, DatabaseError> {
104+
Ok(models::metadata::Entity::find()
105+
.filter(models::metadata::Column::Key.eq("l1_finalized_block"))
106+
.select_only()
107+
.column(models::metadata::Column::Value)
108+
.into_tuple::<String>()
109+
.one(self.get_connection())
110+
.await
111+
.map(|x| x.and_then(|x| x.parse::<u64>().ok()))?)
112+
}
113+
71114
/// Get the newest finalized batch hash up to or at the provided height.
72115
async fn get_finalized_batch_hash_at_height(
73116
&self,
@@ -113,7 +156,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
113156
async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError> {
114157
tracing::trace!(target: "scroll::db", queue_index = l1_message.transaction.queue_index, "Inserting L1 message into database.");
115158
let l1_message: models::l1_message::ActiveModel = l1_message.into();
116-
l1_message.insert(self.get_connection()).await?;
159+
models::l1_message::Entity::insert(l1_message)
160+
.on_conflict(
161+
OnConflict::column(models::l1_message::Column::QueueIndex)
162+
.update_columns(vec![
163+
models::l1_message::Column::QueueHash,
164+
models::l1_message::Column::Hash,
165+
models::l1_message::Column::L1BlockNumber,
166+
models::l1_message::Column::GasLimit,
167+
models::l1_message::Column::To,
168+
models::l1_message::Column::Value,
169+
models::l1_message::Column::Sender,
170+
models::l1_message::Column::Input,
171+
])
172+
.to_owned(),
173+
)
174+
.exec(self.get_connection())
175+
.await?;
117176
Ok(())
118177
}
119178

@@ -208,15 +267,25 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
208267
})?)
209268
}
210269

211-
/// Get the latest safe L2 [`BlockInfo`] from the database.
212-
async fn get_latest_safe_l2_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
270+
/// Get the latest safe L2 ([`BlockInfo`], [`BatchInfo`]) from the database.
271+
async fn get_latest_safe_l2_info(
272+
&self,
273+
) -> Result<Option<(BlockInfo, BatchInfo)>, DatabaseError> {
213274
tracing::trace!(target: "scroll::db", "Fetching latest safe L2 block from database.");
214275
Ok(models::l2_block::Entity::find()
215276
.filter(models::l2_block::Column::BatchIndex.is_not_null())
216277
.order_by_desc(models::l2_block::Column::BlockNumber)
217278
.one(self.get_connection())
218279
.await
219-
.map(|x| x.map(|x| x.block_info()))?)
280+
.map(|x| {
281+
x.map(|x| {
282+
(
283+
x.block_info(),
284+
x.batch_info()
285+
.expect("Batch info must be present due to database query arguments"),
286+
)
287+
})
288+
})?)
220289
}
221290

222291
/// Get the latest L2 [`BlockInfo`] from the database.
@@ -229,6 +298,44 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
229298
.map(|x| x.map(|x| x.block_info()))?)
230299
}
231300

301+
/// Prepare the database on startup and return metadata used for other components in the
302+
/// rollup-node.
303+
///
304+
/// This method first unwinds the database to the finalized L1 block. It then fetches the batch
305+
/// info for the latest safe L2 block. It takes note of the L1 block number at which
306+
/// this batch was produced. It then retrieves the latest block for the previous batch
307+
/// (i.e., the batch before the latest safe block). It returns a tuple of this latest
308+
/// fetched block and the L1 block number of the batch.
309+
async fn prepare_on_startup(
310+
&self,
311+
genesis_hash: B256,
312+
) -> Result<(Option<BlockInfo>, Option<u64>), DatabaseError> {
313+
tracing::trace!(target: "scroll::db", "Fetching startup safe block from database.");
314+
let finalized_block_number = self.get_finalized_l1_block_number().await?.unwrap_or(0);
315+
self.unwind(genesis_hash, finalized_block_number).await?;
316+
let safe = if let Some(batch_info) = self
317+
.get_latest_safe_l2_info()
318+
.await?
319+
.map(|(_, batch_info)| batch_info)
320+
.filter(|b| b.index > 1)
321+
{
322+
let batch = self
323+
.get_batch_by_index(batch_info.index)
324+
.await?
325+
.expect("Batch info must be present due to database query arguments");
326+
let previous_batch = self
327+
.get_batch_by_index(batch_info.index - 1)
328+
.await?
329+
.expect("Batch info must be present due to database query arguments");
330+
let l2_block = self.get_highest_block_for_batch(previous_batch.hash).await?;
331+
(l2_block, Some(batch.block_number))
332+
} else {
333+
(None, None)
334+
};
335+
336+
Ok(safe)
337+
}
338+
232339
/// Delete all L2 blocks with a block number greater than the provided block number.
233340
async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
234341
tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number.");
@@ -312,6 +419,64 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
312419
Ok(None)
313420
}
314421
}
422+
423+
/// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number.
424+
async fn unwind(
425+
&self,
426+
genesis_hash: B256,
427+
l1_block_number: u64,
428+
) -> Result<UnwindResult, DatabaseError> {
429+
// delete batch inputs and l1 messages
430+
let batches_removed = self.delete_batches_gt(l1_block_number).await?;
431+
let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?;
432+
433+
// filter and sort the executed L1 messages
434+
let mut removed_executed_l1_messages: Vec<_> =
435+
deleted_messages.into_iter().filter(|x| x.l2_block_number.is_some()).collect();
436+
removed_executed_l1_messages
437+
.sort_by(|a, b| a.transaction.queue_index.cmp(&b.transaction.queue_index));
438+
439+
// check if we need to reorg the L2 head and delete some L2 blocks
440+
let (queue_index, l2_head_block_info) =
441+
if let Some(msg) = removed_executed_l1_messages.first() {
442+
let l2_reorg_block_number = msg
443+
.l2_block_number
444+
.expect("we guarantee that this is Some(u64) due to the filter above") -
445+
1;
446+
let l2_block_info = self.get_l2_block_info_by_number(l2_reorg_block_number).await?;
447+
self.delete_l2_blocks_gt(l2_reorg_block_number).await?;
448+
(Some(msg.transaction.queue_index), l2_block_info)
449+
} else {
450+
(None, None)
451+
};
452+
453+
// check if we need to reorg the L2 safe block
454+
let l2_safe_block_info = if batches_removed > 0 {
455+
if let Some(x) = self.get_latest_safe_l2_info().await? {
456+
Some(x.0)
457+
} else {
458+
Some(BlockInfo::new(0, genesis_hash))
459+
}
460+
} else {
461+
None
462+
};
463+
464+
// commit the transaction
465+
Ok(UnwindResult { l1_block_number, queue_index, l2_head_block_info, l2_safe_block_info })
466+
}
467+
}
468+
469+
/// The result of [`DatabaseOperations::unwind`].
470+
#[derive(Debug)]
471+
pub struct UnwindResult {
472+
/// The L1 block number that we unwinded to.
473+
pub l1_block_number: u64,
474+
/// The latest unconsumed queue index after the uwnind.
475+
pub queue_index: Option<u64>,
476+
/// The L2 head block info after the unwind. This is only populated if the L2 head has reorged.
477+
pub l2_head_block_info: Option<BlockInfo>,
478+
/// The L2 safe block info after the unwind. This is only populated if the L2 safe has reorged.
479+
pub l2_safe_block_info: Option<BlockInfo>,
315480
}
316481

317482
impl<T> DatabaseOperations for T where T: DatabaseConnectionProvider {}

0 commit comments

Comments
 (0)