Skip to content

Commit

Permalink
Merge pull request #962 from bsbds/merge-refactor-v0.7
Browse files Browse the repository at this point in the history
Merge branch `refactor-v0.7.0-new`
  • Loading branch information
Phoenix500526 authored Aug 26, 2024
2 parents 617e341 + 5843ee5 commit d19df09
Show file tree
Hide file tree
Showing 80 changed files with 4,646 additions and 4,767 deletions.
1 change: 1 addition & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ retries = 0
slow-timeout = { period = "10s", terminate-after = 3 }
status-level = "all"
final-status-level = "slow"
fail-fast = true
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ madsim = { git = "https://github.com/LucienY01/madsim.git", branch = "bz/tonic-0
madsim-tonic = { git = "https://github.com/LucienY01/madsim.git", branch = "bz/tonic-0-12" }
madsim-tonic-build = { git = "https://github.com/LucienY01/madsim.git", branch = "bz/tonic-0-12" }
madsim-tokio = { git = "https://github.com/LucienY01/madsim.git", branch = "bz/tonic-0-12" }

1 change: 1 addition & 0 deletions crates/benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = "1.0.83"
clap = { version = "4", features = ["derive"] }
clippy-utilities = "0.2.0"
etcd-client = { version = "0.14.0", features = ["tls"] }
futures = "0.3.30"
indicatif = "0.17.8"
rand = "0.8.5"
thiserror = "1.0.61"
Expand Down
16 changes: 10 additions & 6 deletions crates/benchmark/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use anyhow::Result;
use clippy_utilities::{NumericCast, OverflowArithmetic};
use futures::future::join_all;
use indicatif::ProgressBar;
use rand::RngCore;
use tokio::{
Expand Down Expand Up @@ -158,7 +159,6 @@ impl CommandRunner {

/// Create clients
async fn create_clients(&self) -> Result<Vec<BenchClient>> {
let mut clients = Vec::with_capacity(self.args.clients);
let client_options = ClientOptions::default().with_client_config(ClientConfig::new(
Duration::from_secs(10),
Duration::from_secs(5),
Expand All @@ -180,11 +180,15 @@ impl CommandRunner {
}
})
.collect::<Vec<_>>();
for _ in 0..self.args.clients {
let client =
BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone()).await?;
clients.push(client);
}
let clients_futs = std::iter::repeat_with(|| {
BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone())
})
.take(self.args.clients);
let clients = join_all(clients_futs)
.await
.into_iter()
.collect::<Result<_, _>>()?;

Ok(clients)
}

Expand Down
141 changes: 86 additions & 55 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<T> pri::Serializable for T where T: pri::ThreadSafe + Clone + Serialize + D
#[async_trait]
pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Error type
type Error: pri::Serializable + PbCodec + std::error::Error;
type Error: pri::Serializable + PbCodec + std::error::Error + Clone;

/// K (key) is used to tell confliction
///
Expand All @@ -50,48 +50,17 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Returns `true` if the command is read-only
fn is_read_only(&self) -> bool;

/// Prepare the command
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::prepare` goes wrong
#[inline]
fn prepare<E>(&self, e: &E) -> Result<Self::PR, Self::Error>
where
E: CommandExecutor<Self> + Send + Sync,
{
<E as CommandExecutor<Self>>::prepare(e, self)
}

/// Execute the command according to the executor
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::execute` goes wrong
#[inline]
async fn execute<E>(&self, e: &E) -> Result<Self::ER, Self::Error>
where
E: CommandExecutor<Self> + Send + Sync,
{
<E as CommandExecutor<Self>>::execute(e, self).await
}

/// Execute the command after_sync callback
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::after_sync` goes wrong
#[inline]
async fn after_sync<E>(
&self,
e: &E,
index: LogIndex,
prepare_res: Self::PR,
) -> Result<Self::ASR, Self::Error>
fn execute<E>(&self, e: &E) -> Result<Self::ER, Self::Error>
where
E: CommandExecutor<Self> + Send + Sync,
{
<E as CommandExecutor<Self>>::after_sync(e, self, index, prepare_res).await
<E as CommandExecutor<Self>>::execute(e, self)
}
}

Expand Down Expand Up @@ -127,40 +96,42 @@ pub trait CommandExecutor<C>: pri::ThreadSafe
where
C: Command,
{
/// Prepare the command
/// Execute the command
///
/// # Errors
///
/// This function may return an error if there is a problem preparing the command.
fn prepare(&self, cmd: &C) -> Result<C::PR, C::Error>;
/// This function may return an error if there is a problem executing the
/// command.
fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;

/// Execute the command
/// Execute the read-only command
///
/// # Errors
///
/// This function may return an error if there is a problem executing the command.
async fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;
/// This function may return an error if there is a problem executing the
/// command.
fn execute_ro(&self, cmd: &C) -> Result<(C::ER, C::ASR), C::Error>;

/// Execute the after_sync callback
///
/// # Errors
/// Batch execute the after_sync callback
///
/// This function may return an error if there is a problem executing the after_sync callback.
async fn after_sync(
/// This `highest_index` means the last log index of the `cmds`
fn after_sync(
&self,
cmd: &C,
index: LogIndex,
prepare_res: C::PR,
) -> Result<C::ASR, C::Error>;
cmds: Vec<AfterSyncCmd<'_, C>>,
// might be `None` if it's a speculative execution
highest_index: Option<LogIndex>,
) -> Vec<Result<AfterSyncOk<C>, C::Error>>;

/// Set the index of the last log entry that has been successfully applied to the command executor
/// Set the index of the last log entry that has been successfully applied
/// to the command executor
///
/// # Errors
///
/// Returns an error if setting the last applied log entry fails.
fn set_last_applied(&self, index: LogIndex) -> Result<(), C::Error>;

/// Get the index of the last log entry that has been successfully applied to the command executor
/// Get the index of the last log entry that has been successfully applied
/// to the command executor
///
/// # Errors
///
Expand All @@ -171,17 +142,21 @@ where
///
/// # Errors
///
/// This function may return an error if there is a problem taking a snapshot.
/// This function may return an error if there is a problem taking a
/// snapshot.
async fn snapshot(&self) -> Result<Snapshot, C::Error>;

/// Reset the command executor using the snapshot or to the initial state if None
/// Reset the command executor using the snapshot or to the initial state if
/// None
///
/// # Errors
///
/// This function may return an error if there is a problem resetting the command executor.
/// This function may return an error if there is a problem resetting the
/// command executor.
async fn reset(&self, snapshot: Option<(Snapshot, LogIndex)>) -> Result<(), C::Error>;

/// Trigger the barrier of the given trigger id (based on propose id) and log index.
/// Trigger the barrier of the given trigger id (based on propose id) and
/// log index.
fn trigger(&self, id: InflightId);
}

Expand Down Expand Up @@ -215,3 +190,59 @@ impl From<DecodeError> for PbSerializeError {
PbSerializeError::RpcDecode(err)
}
}

#[allow(clippy::module_name_repetitions)]
/// After sync command type
#[derive(Debug)]
pub struct AfterSyncCmd<'a, C> {
/// The command
cmd: &'a C,
/// Whether the command needs to be executed in after sync stage
to_execute: bool,
}

impl<'a, C> AfterSyncCmd<'a, C> {
/// Creates a new `AfterSyncCmd`
#[inline]
pub fn new(cmd: &'a C, to_execute: bool) -> Self {
Self { cmd, to_execute }
}

/// Gets the command
#[inline]
#[must_use]
pub fn cmd(&self) -> &'a C {
self.cmd
}

/// Convert self into parts
#[inline]
#[must_use]
pub fn into_parts(&'a self) -> (&'a C, bool) {
(self.cmd, self.to_execute)
}
}

/// Ok type of the after sync result
#[derive(Debug)]
pub struct AfterSyncOk<C: Command> {
/// After Sync Result
asr: C::ASR,
/// Optional Execution Result
er_opt: Option<C::ER>,
}

impl<C: Command> AfterSyncOk<C> {
/// Creates a new [`AfterSyncOk<C>`].
#[inline]
pub fn new(asr: C::ASR, er_opt: Option<C::ER>) -> Self {
Self { asr, er_opt }
}

/// Decomposes `AfterSyncOk` into its constituent parts.
#[inline]
pub fn into_parts(self) -> (C::ASR, Option<C::ER>) {
let Self { asr, er_opt } = self;
(asr, er_opt)
}
}
Loading

0 comments on commit d19df09

Please sign in to comment.