Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch refactor-v0.7.0-new #962

Merged
merged 97 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
f9368cf
refactor: xline store
bsbds Apr 8, 2024
8a38a5b
refactor: after sync command
bsbds Apr 7, 2024
4ab054d
refactor: revision fallback
bsbds Apr 9, 2024
9b78057
refactor: execute in after sync
bsbds Apr 10, 2024
d5ff7d8
refactor: add CE to CurpNode
bsbds Mar 1, 2024
11ea107
chore(madsim): remove CE from curp server creation
bsbds May 29, 2024
93db8b6
feat(wal): add mock implementation of WAL
bsbds Jul 9, 2024
73e7f48
chore(wal): move wal mod.rs to storage.rs
bsbds Jul 12, 2024
2c574d8
refactor(wal): add proxy type for WALStorage
bsbds Jul 12, 2024
e28817c
refactor: curp storage api
bsbds Jul 12, 2024
b134b2d
fix: recover wal in tests
bsbds Jul 12, 2024
3157abf
chore: remove unecessary trait bound for WAL
bsbds Jul 26, 2024
fe71e78
chore: add a comment to the recover behavior of the curp db
bsbds Jul 26, 2024
c6024f0
refactor: curp cmd worker
bsbds Apr 7, 2024
c6b9be4
chore: resolve comments
bsbds Jul 30, 2024
60c3b2f
chore: fix clippy
bsbds Aug 5, 2024
4229606
refactor: curp client streaming
bsbds Apr 8, 2024
b816b09
fix: add client side timeout to the rpc connect
bsbds Jun 12, 2024
d9f3865
chore: change tonic::Result<T> to Result<T, tonic::Status> as madsim …
bsbds May 29, 2024
890966c
chore(madsim): update madsim curp client api
bsbds May 29, 2024
e2ea316
chore: clippy raw curp
bsbds May 16, 2024
ba5dbab
chore: clippy cmd worker
bsbds May 16, 2024
0e8ebb9
chore: resolve comments
bsbds Aug 1, 2024
e6a77bd
chore: move locking into `remove_from_sp_ucp`
bsbds Aug 1, 2024
b7c110e
chore: correct comment location
bsbds Aug 1, 2024
54726a7
chore: remove outdated todos
bsbds Aug 2, 2024
27e1789
fix: only remove from conflict pools in after sync stage
bsbds Aug 2, 2024
d5842da
refactor(xline, curp): switch after sync to sync
bsbds May 27, 2024
3cd7a50
refactor(xline, curp): after sync error returning mechanism
bsbds May 24, 2024
1b611dd
chore: fix clippy
bsbds Aug 5, 2024
825ae1b
refactor(madsim): update ProposeRequest
bsbds May 29, 2024
0ca4f64
fix: watch server test put
bsbds May 28, 2024
1f8c5cf
refactor(curp-client): refresh state when leader is missing
bsbds Aug 6, 2024
46d2cef
chore: remove unused tracing in test
bsbds Aug 8, 2024
085aa7c
refactor: rewrite `AsResultStates`
bsbds Aug 8, 2024
1d1ebd1
chore: rename `map_results` to `for_each_none_result`
bsbds Aug 9, 2024
5c49328
fix(madsim): disable sync wait for compaction in madsim tests
bsbds Aug 8, 2024
8e060f7
fix: persistent empty log entry after becomes the leader
bsbds Aug 7, 2024
07c19fa
fix: simulation curp group
bsbds Aug 7, 2024
d17a33e
fix(madsim): curp madsim tests
bsbds Aug 7, 2024
b767d75
refactor: exclude configuration change entries from conflict pools
bsbds Apr 29, 2024
f9ee351
fix: membership change
bsbds May 15, 2024
a117656
chore: allow pass by value
bsbds Aug 12, 2024
5d371e1
chore: fix clippy
bsbds Aug 12, 2024
6b3fb6d
feat: implement dedup
bsbds Apr 29, 2024
0ae13d2
chore: revert update to nextest.toml
bsbds Aug 13, 2024
f41e9de
chore: exit heartbeat task on the client when the cluster is shutting…
bsbds May 20, 2024
c350133
chore: fix naming of client count
bsbds Aug 13, 2024
aa7551e
chore: move debug info before sleep
bsbds Aug 13, 2024
4f2b6c4
refactor: set a max retry count for
bsbds Aug 13, 2024
26dc48b
chore: remoev unused imports
bsbds Aug 13, 2024
ac5e81c
refactor: use fixed retry interval in
bsbds Aug 13, 2024
d885ba4
refactor: set retry count to a higher value to avoid test failures
bsbds Aug 14, 2024
4525dbd
refactor(task_manager): directly abort cancel safe task
bsbds May 21, 2024
9827929
fix: kv updates task exit before after sync task
bsbds May 28, 2024
0919fdc
refactor: record er and asr for recovered commands
bsbds Apr 30, 2024
1cff8aa
fix: drop lock early to prevent deadlock
bsbds Aug 15, 2024
5f4b611
refactor: cmd board gc
bsbds Apr 30, 2024
04e03e3
chore: make code reusable
bsbds Aug 16, 2024
a3bbf70
chore: fix typos and clippy
bsbds Aug 16, 2024
87287fe
chore: use retain
bsbds Aug 16, 2024
23ba3d8
feat: implement conflict pool gc
bsbds May 6, 2024
01e46d6
chore: fix typos
bsbds Aug 15, 2024
93f6d71
feat: implement no-op wait on leader for read-only commands
bsbds Jul 2, 2024
8e92f1e
feat: implement read index on server and client
bsbds Jul 2, 2024
7e081fd
fix: wait no-op log at term 1
bsbds Jul 12, 2024
0402724
test: add tests for read index
bsbds Jul 9, 2024
05eda9d
fix: add missing functions
bsbds Aug 19, 2024
b34af83
refactor: enable optional returns after sync results for read only co…
bsbds Aug 14, 2024
70b97ec
fix: remove duplicate code in after sync
bsbds Aug 15, 2024
d9a1817
chore: add doc of the meaning of `highest_index` in `after_sync`
bsbds Aug 16, 2024
5e8cdd6
chore: remove tasks that no longer exist
bsbds Aug 19, 2024
5c87755
chore: remove unused imports
bsbds Aug 19, 2024
5415aa3
fix: check leader transfer in lease keep alive
bsbds May 27, 2024
40e55f2
fix: update madsim to fix stream early close issue
bsbds Aug 7, 2024
3577254
fix: Cargo.lock
bsbds Aug 19, 2024
4c99c46
fix: madsim tests
bsbds Aug 23, 2024
fdb5bb2
test: rewrite tests for curp client
bsbds Jul 8, 2024
debbdab
fix: exe_exactly_once_on_leader will only test on leader
bsbds Aug 15, 2024
ba555d5
fix: concurrent_cmd_order_should_have_correct_revision timeout due to…
bsbds Aug 15, 2024
a26b114
fix: execute early before after sync
bsbds Aug 15, 2024
b2caa6e
refactor: disable fast path completely in etcd competible server
bsbds Aug 15, 2024
d225871
fix: use after sync txn and index in lease revoke
bsbds Aug 20, 2024
46ae7d3
chore: remove unecessary txn usage
bsbds Aug 20, 2024
6b627c4
fix: lease store revision generation
bsbds Aug 20, 2024
7bf01d5
fix: use execute_ro to speculative execute read-only commands
bsbds Aug 23, 2024
c6d7d9b
chore: use join_all to concurrently build clients in benchmark
bsbds Aug 16, 2024
c4f1dcb
fix: remove check_members
bsbds Aug 23, 2024
313b819
fix: generate propose id inside client retry closure
bsbds Aug 7, 2024
2ba7ae1
refactor: use synchronous compaction in `sync_compaction`
bsbds Aug 23, 2024
47a8900
chore: update cargo hakari
bsbds Aug 23, 2024
1157cbd
Merge branch 'fix-tests' into merge-refactor-v0.7
bsbds Aug 23, 2024
7d0bc8f
chore: remove unused index from lease_store
bsbds Aug 26, 2024
cad274f
Merge branch 'fix-tests' into merge-refactor-v0.7
bsbds Aug 26, 2024
a31b2f2
Merge remote-tracking branch 'upstream/master' into merge-refactor-v0.7
bsbds Aug 26, 2024
6f909f5
fix: potential panic in shutdown listener
bsbds Aug 26, 2024
5843ee5
fix: only return shutdown error on cluster shutdown
bsbds Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: after sync command
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>

chore: add reminder of a revision issue

Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds authored and Phoenix500526 committed Jul 25, 2024
commit 8a38a5bc7786a45a142bd8aac8056cb5cc3f3320
59 changes: 31 additions & 28 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<T> pri::Serializable for T where T: pri::ThreadSafe + Clone + Serialize + D
#[async_trait]
pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Error type
type Error: pri::Serializable + PbCodec + std::error::Error;
type Error: pri::Serializable + PbCodec + std::error::Error + Clone;

/// K (key) is used to tell confliction
///
Expand Down Expand Up @@ -75,24 +75,6 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
{
<E as CommandExecutor<Self>>::execute(e, self).await
}

/// Execute the command after_sync callback
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::after_sync` goes wrong
#[inline]
async fn after_sync<E>(
&self,
e: &E,
index: LogIndex,
prepare_res: Self::PR,
) -> Result<Self::ASR, Self::Error>
where
E: CommandExecutor<Self> + Send + Sync,
{
<E as CommandExecutor<Self>>::after_sync(e, self, index, prepare_res).await
}
}

/// Check conflict of two keys
Expand Down Expand Up @@ -141,17 +123,12 @@ where
/// This function may return an error if there is a problem executing the command.
async fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;

/// Execute the after_sync callback
///
/// # Errors
///
/// This function may return an error if there is a problem executing the after_sync callback.
/// Batch execute the after_sync callback
async fn after_sync(
&self,
cmd: &C,
index: LogIndex,
prepare_res: C::PR,
) -> Result<C::ASR, C::Error>;
cmds: Vec<AfterSyncCmd<'_, C>>,
highest_index: LogIndex,
) -> Result<Vec<(C::ASR, Option<C::ER>)>, C::Error>;

/// Set the index of the last log entry that has been successfully applied to the command executor
///
Expand Down Expand Up @@ -215,3 +192,29 @@ impl From<DecodeError> for PbSerializeError {
PbSerializeError::RpcDecode(err)
}
}

/// After sync command type
#[derive(Debug)]
pub struct AfterSyncCmd<'a, C> {
/// The command
cmd: &'a C,
/// Whether the command needs to be executed in after sync stage
to_exectue: bool,
}

impl<'a, C> AfterSyncCmd<'a, C> {
/// Creates a new `AfterSyncCmd`
pub fn new(cmd: &'a C, to_exectue: bool) -> Self {
Self { cmd, to_exectue }
}

/// Gets the command
pub fn cmd(&self) -> &'a C {
self.cmd
}

/// Convert self into parts
pub fn into_parts(self) -> (&'a C, bool) {
(self.cmd, self.to_exectue)
}
}
97 changes: 59 additions & 38 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use async_trait::async_trait;
use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec},
cmd::{AfterSyncCmd, Command, CommandExecutor, ConflictCheck, PbCodec},
InflightId, LogIndex,
};
use engine::{
Expand Down Expand Up @@ -307,51 +307,72 @@ impl CommandExecutor<TestCommand> for TestCE {

async fn after_sync(
&self,
cmd: &TestCommand,
index: LogIndex,
revision: <TestCommand as Command>::PR,
) -> Result<<TestCommand as Command>::ASR, <TestCommand as Command>::Error> {
sleep(cmd.as_dur).await;
if cmd.as_should_fail {
cmds: Vec<AfterSyncCmd<'_, TestCommand>>,
highest_index: LogIndex,
) -> Result<
Vec<(
<TestCommand as Command>::ASR,
Option<<TestCommand as Command>::ER>,
)>,
<TestCommand as Command>::Error,
> {
let as_duration = cmds
.iter()
.fold(Duration::default(), |acc, c| acc + c.cmd().as_dur);
sleep(as_duration).await;
if cmds.iter().any(|c| c.cmd().as_should_fail) {
return Err(ExecuteError("fail".to_owned()));
}
self.after_sync_sender
.send((cmd.clone(), index))
.expect("failed to send after sync msg");
let total = cmds.len();
for (i, cmd) in cmds.iter().enumerate() {
let index = highest_index - (total - i - 1) as u64;
self.after_sync_sender
.send((cmd.cmd().clone(), index))
.expect("failed to send after sync msg");
}
let mut wr_ops = vec![WriteOperation::new_put(
META_TABLE,
APPLIED_INDEX_KEY.into(),
index.to_le_bytes().to_vec(),
highest_index.to_le_bytes().to_vec(),
)];
if let TestCommandType::Put(v) = cmd.cmd_type {
debug!("cmd {:?}-{:?} revision is {}", cmd.cmd_type, cmd, revision);
let value = v.to_le_bytes().to_vec();
let keys = cmd
.keys
.iter()
.map(|k| k.to_le_bytes().to_vec())
.collect_vec();
wr_ops.extend(
keys.clone()
.into_iter()
.map(|key| WriteOperation::new_put(TEST_TABLE, key, value.clone()))
.chain(keys.into_iter().map(|key| {
WriteOperation::new_put(
REVISION_TABLE,
key,
revision.to_le_bytes().to_vec(),
)
})),

let mut asrs = Vec::new();
for (i, c) in cmds.iter().enumerate() {
let cmd = c.cmd();
let index = highest_index - (total - i) as u64;
asrs.push((LogIndexResult(index), None));
if let TestCommandType::Put(v) = cmd.cmd_type {
let revision = self.revision.fetch_add(1, Ordering::Relaxed);
debug!("cmd {:?}-{:?} revision is {}", cmd.cmd_type, cmd, revision);
let value = v.to_le_bytes().to_vec();
let keys = cmd
.keys
.iter()
.map(|k| k.to_le_bytes().to_vec())
.collect_vec();
wr_ops.extend(
keys.clone()
.into_iter()
.map(|key| WriteOperation::new_put(TEST_TABLE, key, value.clone()))
.chain(keys.into_iter().map(|key| {
WriteOperation::new_put(
REVISION_TABLE,
key,
revision.to_le_bytes().to_vec(),
)
})),
);
}
debug!(
"{} after sync cmd({:?} - {:?}), index: {index}",
self.server_name, cmd.cmd_type, cmd
);
self.store
.write_multi(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
}
debug!(
"{} after sync cmd({:?} - {:?}), index: {index}",
self.server_name, cmd.cmd_type, cmd
);
Ok(index.into())

self.store
.write_multi(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
Ok(asrs)
}

fn set_last_applied(&self, index: LogIndex) -> Result<(), <TestCommand as Command>::Error> {
Expand Down
Loading