Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vm-runner): make vm runner report time taken #2369

Merged
merged 7 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

13 changes: 10 additions & 3 deletions core/lib/dal/src/vm_runner_dal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use std::time::Instant;

use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::InstrumentExt,
utils::duration_to_naive_time,
};
use zksync_types::L1BatchNumber;

use crate::Core;
Expand Down Expand Up @@ -68,15 +73,17 @@ impl VmRunnerDal<'_, '_> {
pub async fn mark_protective_reads_batch_as_completed(
&mut self,
l1_batch_number: L1BatchNumber,
started_at: Instant,
) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
vm_runner_protective_reads (l1_batch_number, created_at, updated_at)
vm_runner_protective_reads (l1_batch_number, created_at, updated_at, time_taken)
VALUES
($1, NOW(), NOW())
($1, NOW(), NOW(), $2)
"#,
i64::from(l1_batch_number.0),
duration_to_naive_time(started_at.elapsed()),
)
.instrument("mark_protective_reads_batch_as_completed")
.report_latency()
Expand Down
14 changes: 10 additions & 4 deletions core/node/metadata_calculator/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! Tests for the metadata calculator component life cycle.

use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration};
use std::{
future::Future,
ops, panic,
path::Path,
sync::Arc,
time::{Duration, Instant},
};

use assert_matches::assert_matches;
use itertools::Itertools;
Expand Down Expand Up @@ -545,7 +551,7 @@ async fn test_postgres_backup_recovery(
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_without_metadata.number)
.mark_protective_reads_batch_as_completed(batch_without_metadata.number, Instant::now())
.await
.unwrap();
insert_initial_writes_for_batch(&mut storage, batch_without_metadata.number).await;
Expand Down Expand Up @@ -575,7 +581,7 @@ async fn test_postgres_backup_recovery(
.await
.unwrap();
txn.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_header.number)
.mark_protective_reads_batch_as_completed(batch_header.number, Instant::now())
.await
.unwrap();
insert_initial_writes_for_batch(&mut txn, batch_header.number).await;
Expand Down Expand Up @@ -812,7 +818,7 @@ pub(super) async fn extend_db_state_from_l1_batch(
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_number)
.mark_protective_reads_batch_as_completed(batch_number, Instant::now())
.await
.unwrap();
insert_initial_writes_for_batch(storage, batch_number).await;
Expand Down
5 changes: 3 additions & 2 deletions core/node/vm_runner/src/impls/protective_reads.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use anyhow::Context;
use async_trait::async_trait;
Expand Down Expand Up @@ -111,10 +111,11 @@ impl VmRunnerIo for ProtectiveReadsIo {
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
started_at: Instant,
) -> anyhow::Result<()> {
Ok(conn
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(l1_batch_number)
.mark_protective_reads_batch_as_completed(l1_batch_number, started_at)
.await?)
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/node/vm_runner/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, time::Instant};

use async_trait::async_trait;
use zksync_dal::{Connection, Core};
Expand Down Expand Up @@ -41,5 +41,6 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static {
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
started_at: Instant,
) -> anyhow::Result<()>;
}
16 changes: 8 additions & 8 deletions core/node/vm_runner/src/output_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
fmt::{Debug, Formatter},
mem,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Context;
Expand All @@ -18,7 +18,7 @@ use zksync_types::L1BatchNumber;

use crate::{metrics::METRICS, VmRunnerIo};

type BatchReceiver = oneshot::Receiver<JoinHandle<anyhow::Result<()>>>;
type BatchReceiver = oneshot::Receiver<JoinHandle<anyhow::Result<Instant>>>;

/// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch.
///
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<Io: VmRunnerIo, F: OutputHandlerFactory> OutputHandlerFactory
enum AsyncOutputHandler {
Running {
handler: Box<dyn StateKeeperOutputHandler>,
sender: oneshot::Sender<JoinHandle<anyhow::Result<()>>>,
sender: oneshot::Sender<JoinHandle<anyhow::Result<Instant>>>,
},
Finished,
}
Expand Down Expand Up @@ -173,10 +173,10 @@ impl StateKeeperOutputHandler for AsyncOutputHandler {
} => {
sender
.send(tokio::task::spawn(async move {
let latency = METRICS.output_handle_time.start();
let started_at = Instant::now();
let result = handler.handle_l1_batch(updates_manager).await;
latency.observe();
result
METRICS.output_handle_time.observe(started_at.elapsed());
result.map(|_| started_at)
}))
.ok();
Ok(())
Expand Down Expand Up @@ -243,13 +243,13 @@ impl<Io: VmRunnerIo> ConcurrentOutputHandlerFactoryTask<Io> {
.context("handler was dropped before the batch was fully processed")?;
// Wait until the handle is resolved, meaning that the `handle_l1_batch`
// computation has finished, and we can consider this batch to be completed
handle
let started_at = handle
.await
.context("failed to await for batch to be processed")??;
latest_processed_batch += 1;
let mut conn = self.pool.connection_tagged(self.io.name()).await?;
self.io
.mark_l1_batch_as_completed(&mut conn, latest_processed_batch)
.mark_l1_batch_as_completed(&mut conn, latest_processed_batch, started_at)
.await?;
METRICS
.last_processed_batch
Expand Down
8 changes: 7 additions & 1 deletion core/node/vm_runner/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, ops, sync::Arc, time::Duration};
use std::{
collections::HashMap,
ops,
sync::Arc,
time::{Duration, Instant},
};

use async_trait::async_trait;
use rand::{prelude::SliceRandom, Rng};
Expand Down Expand Up @@ -59,6 +64,7 @@ impl VmRunnerIo for Arc<RwLock<IoMock>> {
&self,
_conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
_started_at: Instant,
) -> anyhow::Result<()> {
self.write().await.current = l1_batch_number;
Ok(())
Expand Down
Loading