Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vm-runner): make vm runner report time taken #2369

Merged
merged 7 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_protective_reads DROP COLUMN IF EXISTS processing_started_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_protective_reads ADD COLUMN IF NOT EXISTS processing_started_at TIME;
41 changes: 38 additions & 3 deletions core/lib/dal/src/vm_runner_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ impl VmRunnerDal<'_, '_> {
MAX(l1_batch_number) AS "last_processed_l1_batch"
FROM
vm_runner_protective_reads
WHERE
time_taken IS NOT NULL
"#
)
.instrument("get_protective_reads_latest_processed_batch")
Expand Down Expand Up @@ -46,6 +48,8 @@ impl VmRunnerDal<'_, '_> {
COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch"
FROM
vm_runner_protective_reads
WHERE
time_taken IS NOT NULL
)
SELECT
LEAST(last_batch, last_ready_batch) AS "last_ready_batch!"
Expand All @@ -63,23 +67,54 @@ impl VmRunnerDal<'_, '_> {
Ok(L1BatchNumber(row.last_ready_batch as u32))
}

pub async fn mark_protective_reads_batch_as_completed(
pub async fn mark_protective_reads_batch_as_processing(
&mut self,
l1_batch_number: L1BatchNumber,
) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
vm_runner_protective_reads (l1_batch_number, created_at, updated_at)
vm_runner_protective_reads (l1_batch_number, created_at, updated_at, processing_started_at)
VALUES
($1, NOW(), NOW())
($1, NOW(), NOW(), NOW())
ON CONFLICT (l1_batch_number) DO
UPDATE
SET
updated_at = NOW(),
processing_started_at = NOW()
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_protective_reads_batch_as_processing")
.report_latency()
.execute(self.storage)
.await?;
Ok(())
}

pub async fn mark_protective_reads_batch_as_completed(
&mut self,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
let update_result = sqlx::query!(
r#"
UPDATE vm_runner_protective_reads
SET
time_taken = NOW() - processing_started_at
WHERE
l1_batch_number = $1
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_protective_reads_batch_as_completed")
.report_latency()
.execute(self.storage)
.await?;
if update_result.rows_affected() == 0 {
anyhow::bail!(
"Trying to mark an L1 batch as completed while it is not being processed"
);
}
Ok(())
}

Expand Down
14 changes: 14 additions & 0 deletions core/node/metadata_calculator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,11 @@ async fn test_postgres_backup_recovery(
.insert_mock_l1_batch(batch_without_metadata)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_processing(batch_without_metadata.number)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_without_metadata.number)
Expand Down Expand Up @@ -575,6 +580,10 @@ async fn test_postgres_backup_recovery(
.insert_mock_l1_batch(batch_header)
.await
.unwrap();
txn.vm_runner_dal()
.mark_protective_reads_batch_as_processing(batch_header.number)
.await
.unwrap();
txn.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_header.number)
.await
Expand Down Expand Up @@ -811,6 +820,11 @@ pub(super) async fn extend_db_state_from_l1_batch(
.mark_l2_blocks_as_executed_in_l1_batch(batch_number)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_processing(batch_number)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_number)
Expand Down
14 changes: 12 additions & 2 deletions core/node/vm_runner/src/impls/protective_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,26 @@ impl VmRunnerIo for ProtectiveReadsIo {
.await?)
}

async fn mark_l1_batch_as_completed(
async fn mark_l1_batch_as_processing(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
Ok(conn
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(l1_batch_number)
.mark_protective_reads_batch_as_processing(l1_batch_number)
.await?)
}

async fn mark_l1_batch_as_completed(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
conn.vm_runner_dal()
.mark_protective_reads_batch_as_completed(l1_batch_number)
.await
}
}

#[derive(Debug)]
Expand Down
12 changes: 12 additions & 0 deletions core/node/vm_runner/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static {
conn: &mut Connection<'_, Core>,
) -> anyhow::Result<L1BatchNumber>;

/// Marks the specified batch as being in progress. Must be called before a batch can be marked
/// as completed.
///
/// # Errors
///
/// Propagates DB errors.
async fn mark_l1_batch_as_processing(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()>;

/// Marks the specified batch as the latest completed batch. All earlier batches are considered
/// to be completed too. No guarantees about later batches.
///
Expand Down
3 changes: 3 additions & 0 deletions core/node/vm_runner/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ impl VmRunner {
.create_handler(next_batch)
.await?;

self.io
.mark_l1_batch_as_processing(&mut self.pool.connection().await?, next_batch)
.await?;
let handle = tokio::task::spawn(Self::process_batch(
batch_executor,
batch_data.l2_blocks,
Expand Down
8 changes: 8 additions & 0 deletions core/node/vm_runner/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ impl VmRunnerIo for Arc<RwLock<IoMock>> {
Ok(io.current + io.max)
}

async fn mark_l1_batch_as_processing(
&self,
_conn: &mut Connection<'_, Core>,
_l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
Ok(())
}

async fn mark_l1_batch_as_completed(
&self,
_conn: &mut Connection<'_, Core>,
Expand Down
Loading