Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add worker interval config #42

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion configs/settlement.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,11 @@ private_key = "0xbe825d459385bbb3ba169ac8d59bd05b099b190d8a89aace7e5bc3518e2f1a9
chain_id = 12345

[ethereum_settlement_config.zeth_config.zeth_contracts_addr]
global_exit = "0xd0438bB2b2522D56d498F871fcf786006c180b64"
global_exit = "0xd0438bB2b2522D56d498F871fcf786006c180b64"

# A general configuration required for the settlement worker
[settlement_worker_config]
# unit: second
proof_worker_interval = 1
verify_worker_interval = 1
rollup_worker_interval = 1
12 changes: 12 additions & 0 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::custom_reth::TxFilterConfig;
use crate::db::lfs;
use crate::operator::Operator;
use crate::settlement::ethereum::EthereumSettlementConfig;
use crate::settlement::worker::WorkerConfig;
use crate::settlement::NetworkSpec;
use anyhow::{anyhow, Result};
use tokio::select;
Expand Down Expand Up @@ -194,6 +195,16 @@ impl RunCmd {
}
};

let settlement_worker_config = match &self.settlement_conf {
None => {
return Err(anyhow::anyhow!(
"Custom node configuration is required for custom node"
));
}

Some(settlement_conf_path) => WorkerConfig::from_conf_path(settlement_conf_path)?,
};

// Load the database configuration
let db_config = match self.base_params.databases.database {
Database::Memory => {
Expand Down Expand Up @@ -259,6 +270,7 @@ impl RunCmd {
a.as_str(),
stop_rx,
reth_started_signal_rx,
&settlement_worker_config,
)
.await
});
Expand Down
18 changes: 16 additions & 2 deletions src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ use ethers_providers::{Http, Provider};
// use serde::Serialize;
use crate::db::Database;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;

use crate::settlement::worker::Settler;
use crate::settlement::worker::{Settler, WorkerConfig};

pub(crate) struct Operator;

impl Operator {
#[allow(clippy::too_many_arguments)]
pub async fn run(
l2addr: &str,
prover_addr: &str,
Expand All @@ -26,6 +28,7 @@ impl Operator {
aggregator_addr: &str,
mut stop_rx: Receiver<()>,
mut reth_started_signal_rx: Receiver<()>,
general_config: &WorkerConfig,
) -> Result<()> {
// initialize all components of the eigen-zeth full node
// initialize the prover
Expand Down Expand Up @@ -59,32 +62,43 @@ impl Operator {
let arc_db_for_verify_worker = rollup_db.clone();
let settlement_provider_for_verify_worker = arc_settlement_provider.clone();
let (verify_stop_tx, verify_stop_rx) = mpsc::channel::<()>(1);
let verify_interval = Duration::from_secs(general_config.verify_worker_interval);
tokio::spawn(async move {
Settler::verify_worker(
arc_db_for_verify_worker,
settlement_provider_for_verify_worker,
verify_stop_rx,
verify_interval,
)
.await
});

// start the proof worker
let arc_db_for_proof_worker = rollup_db.clone();
let (proof_stop_tx, proof_stop_rx) = mpsc::channel::<()>(1);
let proof_interval = Duration::from_secs(general_config.proof_worker_interval);
tokio::spawn(async move {
Settler::proof_worker(arc_db_for_proof_worker, prover, proof_stop_rx).await
Settler::proof_worker(
arc_db_for_proof_worker,
prover,
proof_stop_rx,
proof_interval,
)
.await
});

let arc_db_for_submit_worker = rollup_db.clone();
let settlement_provider_for_submit_worker = arc_settlement_provider.clone();
let l2provider_for_submit_worker = l2provider.clone();
let (submit_stop_tx, submit_stop_rx) = mpsc::channel::<()>(1);
let rollup_interval = Duration::from_secs(general_config.rollup_worker_interval);
tokio::spawn(async move {
Settler::rollup(
arc_db_for_submit_worker,
l2provider_for_submit_worker,
settlement_provider_for_submit_worker,
submit_stop_rx,
rollup_interval,
)
.await
});
Expand Down
67 changes: 58 additions & 9 deletions src/settlement/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,44 @@ use crate::prover::ProverChannel;
use crate::settlement::{BatchData, Settlement};
use alloy_rlp::{length_of_length, BytesMut, Encodable, Header};
use anyhow::{anyhow, Result};
use config::{Config, File};
use ethers::prelude::U64;
use ethers_core::types::{BlockId, BlockNumber, Transaction};
use ethers_providers::{Http, Middleware, Provider};
use prost::bytes;
use reqwest::Client;
use reth_primitives::{Bytes, TransactionKind, TxLegacy};
use serde::Deserialize;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

const PROOF_INTERVAL: Duration = Duration::from_secs(30);
const VERIFY_INTERVAL: Duration = Duration::from_secs(30);
const SUBMIT_INTERVAL: Duration = Duration::from_secs(30);

pub(crate) struct Settler {}

/// A general configuration that needs to be included in the configuration structure of each implementation.
#[derive(Debug, Clone, Deserialize)]
pub struct WorkerConfig {
pub proof_worker_interval: u64,
pub verify_worker_interval: u64,
pub rollup_worker_interval: u64,
}

impl WorkerConfig {
pub fn from_conf_path(conf_path: &str) -> Result<Self> {
log::info!("Load the settlement worker config from: {}", conf_path);

let config = Config::builder()
.add_source(File::from(Path::new(conf_path)))
.build()
.map_err(|e| anyhow!("Failed to build config: {:?}", e))?;

config
.get("settlement_worker_config")
.map_err(|e| anyhow!("Failed to parse WorkerConfig: {:?}", e))
}
}

// TODO: Use channels, streams, and other methods to flow data between workers,
// and use event driven replacement of rotation databases to drive Zeth to run,
// avoiding frequent database access
Expand All @@ -29,8 +51,9 @@ impl Settler {
db: Arc<Box<dyn Database>>,
mut prover: ProverChannel,
mut stop_rx: mpsc::Receiver<()>,
worker_interval: Duration,
) -> Result<()> {
let mut ticker = tokio::time::interval(PROOF_INTERVAL);
let mut ticker = tokio::time::interval(worker_interval);
prover.start().await.unwrap();

log::info!("Prove Worker started");
Expand Down Expand Up @@ -141,8 +164,9 @@ impl Settler {
db: Arc<Box<dyn Database>>,
settlement_provider: Arc<Box<dyn Settlement>>,
mut stop_rx: mpsc::Receiver<()>,
worker_interval: Duration,
) -> Result<()> {
let mut ticker = tokio::time::interval(VERIFY_INTERVAL);
let mut ticker = tokio::time::interval(worker_interval);
let bridge_service_client = Client::new();
log::info!("Verify Worker started");
loop {
Expand Down Expand Up @@ -222,8 +246,9 @@ impl Settler {
l2provider: Provider<Http>,
settlement_provider: Arc<Box<dyn Settlement>>,
mut stop_rx: mpsc::Receiver<()>,
worker_interval: Duration,
) -> Result<()> {
let mut ticker = tokio::time::interval(SUBMIT_INTERVAL);
let mut ticker = tokio::time::interval(worker_interval);
log::info!("Submit Worker started");
loop {
tokio::select! {
Expand Down Expand Up @@ -531,7 +556,16 @@ mod tests {

let (tx, rx) = mpsc::channel(1);
let stop_rx = rx;
let submit_worker = Settler::rollup(arc_db, l2provider, arc_settlement_provider, stop_rx);

let conf_path = "configs/settlement.toml";
let config = WorkerConfig::from_conf_path(conf_path).unwrap();
let submit_worker = Settler::rollup(
arc_db,
l2provider,
arc_settlement_provider,
stop_rx,
Duration::from_secs(config.rollup_worker_interval),
);

tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down Expand Up @@ -608,7 +642,15 @@ mod tests {

let (tx, rx) = mpsc::channel(1);
let stop_rx: mpsc::Receiver<()> = rx;
let verify_worker = Settler::verify_worker(arc_db, arc_settlement_provider, stop_rx);

let conf_path = "configs/settlement.toml";
let config = WorkerConfig::from_conf_path(conf_path).unwrap();
let verify_worker = Settler::verify_worker(
arc_db,
arc_settlement_provider,
stop_rx,
Duration::from_secs(config.rollup_worker_interval),
);

tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down Expand Up @@ -640,4 +682,11 @@ mod tests {
);
Ok(())
}

#[test]
fn test_from_conf_path() {
let conf_path = "configs/settlement.toml";
let config = WorkerConfig::from_conf_path(conf_path).unwrap();
println!("{:#?}", config);
}
}
Loading