diff --git a/crates/xline-client/src/clients/lock.rs b/crates/xline-client/src/clients/lock.rs index ed17b52c2..58cd03b60 100644 --- a/crates/xline-client/src/clients/lock.rs +++ b/crates/xline-client/src/clients/lock.rs @@ -19,11 +19,9 @@ use xlineapi::{ }; use crate::{ - clients::{lease::LeaseClient, watch::WatchClient}, + clients::watch::WatchClient, error::{Result, XlineClientError}, - lease_gen::LeaseIdGenerator, types::{ - lease::LeaseGrantRequest, lock::{LockRequest, UnlockRequest}, watch::WatchRequest, }, @@ -35,8 +33,6 @@ use crate::{ pub struct LockClient { /// The client running the CURP protocol, communicate with all servers. curp_client: Arc, - /// The lease client - lease_client: LeaseClient, /// The watch client watch_client: WatchClient, /// Auth token @@ -47,7 +43,6 @@ impl Debug for LockClient { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LockClient") - .field("lease_client", &self.lease_client) .field("watch_client", &self.watch_client) .field("token", &self.token) .finish() @@ -59,15 +54,9 @@ impl Debug for LockClient { impl LockClient { /// Creates a new `LockClient` #[inline] - pub fn new( - curp_client: Arc, - channel: Channel, - token: Option, - id_gen: Arc, - ) -> Self { + pub fn new(curp_client: Arc, channel: Channel, token: Option) -> Self { Self { - curp_client: Arc::clone(&curp_client), - lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen), + curp_client, watch_client: WatchClient::new(channel, token.clone()), token, } @@ -84,6 +73,10 @@ impl LockClient { /// /// This function will return an error if the inner CURP client encountered a propose failure /// + /// # Panics + /// + /// Panic if the given `LockRequest.inner.lease` less than or equal to 0 + /// /// # Examples /// /// ```no_run @@ -116,14 +109,11 @@ impl LockClient { /// ``` #[inline] pub async fn lock(&self, request: LockRequest) -> Result { - let mut lease_id = request.inner.lease; - if lease_id == 0 { - let resp = self - .lease_client - .grant(LeaseGrantRequest::new(request.ttl)) - .await?; - lease_id = resp.id; - } + assert!( + request.inner.lease > 0, + "The LockRequest.lease_id should larger than 0" + ); + let lease_id = request.inner.lease; let prefix = format!( "{}/", String::from_utf8_lossy(&request.inner.name).into_owned() diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index 11f780cdc..61dda6f34 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -269,12 +269,7 @@ impl Client { token.clone(), Arc::clone(&id_gen), ); - let lock = LockClient::new( - Arc::clone(&curp_client), - channel.clone(), - token.clone(), - id_gen, - ); + let lock = LockClient::new(Arc::clone(&curp_client), channel.clone(), token.clone()); let auth = AuthClient::new(curp_client, channel.clone(), token.clone()); let maintenance = MaintenanceClient::new(channel.clone(), token.clone()); let cluster = ClusterClient::new(channel.clone(), token.clone()); diff --git a/crates/xline-client/src/types/lock.rs b/crates/xline-client/src/types/lock.rs index f150946d8..7b225e51c 100644 --- a/crates/xline-client/src/types/lock.rs +++ b/crates/xline-client/src/types/lock.rs @@ -1,7 +1,7 @@ pub use xlineapi::{LockResponse, UnlockResponse}; /// Default session ttl -const DEFAULT_SESSION_TTL: i64 = 60; +pub const DEFAULT_SESSION_TTL: i64 = 60; /// Request for `Lock` #[derive(Debug, PartialEq)] diff --git a/crates/xline-client/tests/it/lock.rs b/crates/xline-client/tests/it/lock.rs index 279b60bc5..a41a4bad9 100644 --- a/crates/xline-client/tests/it/lock.rs +++ b/crates/xline-client/tests/it/lock.rs @@ -2,21 +2,36 @@ use std::time::Duration; use test_macros::abort_on_panic; use xline_client::{ + clients::LeaseClient, error::Result, - types::lock::{LockRequest, UnlockRequest}, + types::{ + lease::LeaseGrantRequest, + lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL}, + }, }; use super::common::get_cluster_client; +async fn gen_lease_id(client: LeaseClient, ttl: i64) -> i64 { + client + .grant(LeaseGrantRequest::new(ttl)) + .await + .expect("grant lease should be success") + .id +} + #[tokio::test(flavor = "multi_thread")] async fn lock_unlock_should_success_in_normal_path() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); + let lock_client = client.lock_client(); + let lease_id = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; - let resp = client.lock(LockRequest::new("lock-test")).await?; + let resp = lock_client + .lock(LockRequest::new("lock-test").with_lease(lease_id)) + .await?; assert!(resp.key.starts_with(b"lock-test/")); - client.unlock(UnlockRequest::new(resp.key)).await?; + lock_client.unlock(UnlockRequest::new(resp.key)).await?; Ok(()) } @@ -24,31 +39,44 @@ async fn lock_unlock_should_success_in_normal_path() -> Result<()> { #[abort_on_panic] async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); + let lock_client = client.lock_client(); + let lease_id_1 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; + let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; + let client_c = client.clone(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let resp = client.lock(LockRequest::new("lock-test")).await.unwrap(); + let resp = lock_client + .lock(LockRequest::new("lock-test").with_lease(lease_id_1)) + .await + .unwrap(); let handle = tokio::spawn(async move { let res = tokio::time::timeout( Duration::from_secs(2), - client_c.lock(LockRequest::new("lock-test")), + client_c + .lock_client() + .lock(LockRequest::new("lock-test").with_lease(lease_id_2)), ) .await; assert!(res.is_err()); let _ignore = tx.send(()); - + let lease_id_3 = gen_lease_id(client_c.lease_client(), DEFAULT_SESSION_TTL).await; let res = tokio::time::timeout( Duration::from_millis(200), - client_c.lock(LockRequest::new("lock-test")), + client_c + .lock_client() + .lock(LockRequest::new("lock-test").with_lease(lease_id_3)), ) .await; assert!(res.is_ok_and(|r| r.is_ok_and(|resp| resp.key.starts_with(b"lock-test/")))); }); rx.recv().await.unwrap(); - let _resp = client.unlock(UnlockRequest::new(resp.key)).await.unwrap(); + let _resp = lock_client + .unlock(UnlockRequest::new(resp.key)) + .await + .unwrap(); handle.await.unwrap(); @@ -59,16 +87,18 @@ async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> { #[abort_on_panic] async fn lock_should_timeout_when_ttl_is_set() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); + let lock_client = client.lock_client(); + let lease_id_1 = gen_lease_id(client.lease_client(), 1).await; - let _resp = client - .lock(LockRequest::new("lock-test").with_ttl(1)) + let _resp = lock_client + .lock(LockRequest::new("lock-test").with_lease(lease_id_1)) .await .unwrap(); + let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; let resp = tokio::time::timeout( Duration::from_secs(2), - client.lock(LockRequest::new("lock-test")), + lock_client.lock(LockRequest::new("lock-test").with_lease(lease_id_2)), ) .await .expect("timeout when trying to lock")?; @@ -82,26 +112,32 @@ async fn lock_should_timeout_when_ttl_is_set() -> Result<()> { #[abort_on_panic] async fn lock_should_unlock_after_cancelled() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); - let client_c = client.clone(); + let lock_client = client.lock_client(); + let client_c = lock_client.clone(); + let lease_id_1 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; + let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; + let lease_id_3 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await; // first acquire the lock - let resp = client.lock(LockRequest::new("lock-test")).await.unwrap(); + let resp = lock_client + .lock(LockRequest::new("lock-test").with_lease(lease_id_1)) + .await + .unwrap(); // acquire the lock again and then cancel it let res = tokio::time::timeout( Duration::from_secs(1), - client_c.lock(LockRequest::new("lock-test")), + client_c.lock(LockRequest::new("lock-test").with_lease(lease_id_2)), ) .await; assert!(res.is_err()); // unlock the first one - client.unlock(UnlockRequest::new(resp.key)).await?; + lock_client.unlock(UnlockRequest::new(resp.key)).await?; // try lock again, it should success let resp = tokio::time::timeout( Duration::from_secs(1), - client.lock(LockRequest::new("lock-test")), + lock_client.lock(LockRequest::new("lock-test").with_lease(lease_id_3)), ) .await .expect("timeout when trying to lock")?; diff --git a/crates/xline-test-utils/src/bin/validation_lock_client.rs b/crates/xline-test-utils/src/bin/validation_lock_client.rs index ab8f6ccec..4baa78bab 100644 --- a/crates/xline-test-utils/src/bin/validation_lock_client.rs +++ b/crates/xline-test-utils/src/bin/validation_lock_client.rs @@ -3,7 +3,10 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use xline_client::{ - types::lock::{LockRequest, UnlockRequest}, + types::{ + lease::LeaseGrantRequest, + lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL}, + }, Client, ClientOptions, }; @@ -40,16 +43,23 @@ async fn main() -> Result<()> { } else { args.endpoints }; - let client = Client::connect(endpoints, ClientOptions::default()) + let client = Client::connect(endpoints, ClientOptions::default()).await?; + let lock_client = client.lock_client(); + let lease_client = client.lease_client(); + let lease_id = lease_client + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) .await? - .lock_client(); + .id; + match args.command { Commands::Lock { name } => { - let lock_res = client.lock(LockRequest::new(name)).await?; + let lock_res = lock_client + .lock(LockRequest::new(name).with_lease(lease_id)) + .await?; println!("{}", String::from_utf8_lossy(&lock_res.key)) } Commands::Unlock { key } => { - let _unlock_res = client.unlock(UnlockRequest::new(key)).await?; + let _unlock_res = lock_client.unlock(UnlockRequest::new(key)).await?; println!("unlock success"); } }; diff --git a/crates/xline/tests/it/lock_test.rs b/crates/xline/tests/it/lock_test.rs index 97b17d6f4..ec7fda8b3 100644 --- a/crates/xline/tests/it/lock_test.rs +++ b/crates/xline/tests/it/lock_test.rs @@ -5,7 +5,7 @@ use tokio::time::{self, timeout}; use xline_test_utils::{ types::{ lease::LeaseGrantRequest, - lock::{LockRequest, UnlockRequest}, + lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL}, }, Cluster, }; @@ -17,11 +17,19 @@ async fn test_lock() -> Result<(), Box> { cluster.start().await; let client = cluster.client().await; let lock_client = client.lock_client(); + let lease_id_1 = client + .lease_client() + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await? + .id; let lock_handle = tokio::spawn({ let c = lock_client.clone(); async move { - let res = c.lock(LockRequest::new("test")).await.unwrap(); + let res = c + .lock(LockRequest::new("test").with_lease(lease_id_1)) + .await + .unwrap(); time::sleep(Duration::from_secs(3)).await; let _res = c.unlock(UnlockRequest::new(res.key)).await.unwrap(); } @@ -29,7 +37,14 @@ async fn test_lock() -> Result<(), Box> { time::sleep(Duration::from_secs(1)).await; let now = time::Instant::now(); - let res = lock_client.lock(LockRequest::new("test")).await?; + let lease_id_2 = client + .lease_client() + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await? + .id; + let res = lock_client + .lock(LockRequest::new("test").with_lease(lease_id_2)) + .await?; let elapsed = now.elapsed(); assert!(res.key.starts_with(b"test")); assert!(elapsed >= Duration::from_secs(1)); @@ -46,18 +61,23 @@ async fn test_lock_timeout() -> Result<(), Box> { let client = cluster.client().await; let lock_client = client.lock_client(); - let lease_id = client + let lease_id_1 = client .lease_client() .grant(LeaseGrantRequest::new(1)) .await? .id; let _res = lock_client - .lock(LockRequest::new("test").with_lease(lease_id)) + .lock(LockRequest::new("test").with_lease(lease_id_1)) .await?; + let lease_id_2 = client + .lease_client() + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await? + .id; let res = timeout( Duration::from_secs(3), - lock_client.lock(LockRequest::new("test")), + lock_client.lock(LockRequest::new("test").with_lease(lease_id_2)), ) .await??; assert!(res.key.starts_with(b"test")); diff --git a/crates/xlinectl/src/command/lease/keep_alive.rs b/crates/xlinectl/src/command/lease/keep_alive.rs index 7e65c0339..397e5ef3a 100644 --- a/crates/xlinectl/src/command/lease/keep_alive.rs +++ b/crates/xlinectl/src/command/lease/keep_alive.rs @@ -44,7 +44,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result } else { tokio::select! { _ = ctrl_c() => {} - result = keep_alive_loop(keeper, stream) => { + result = keep_alive_loop(keeper, stream, true) => { return result; } } @@ -54,14 +54,17 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result } /// keep alive forever unless encounter error -async fn keep_alive_loop( +pub(crate) async fn keep_alive_loop( mut keeper: LeaseKeeper, mut stream: Streaming, + verbose: bool, ) -> Result<()> { loop { keeper.keep_alive()?; if let Some(resp) = stream.message().await? { - resp.print(); + if verbose { + resp.print(); + } if resp.ttl < 0 { return Err(XlineClientError::InvalidArgs(String::from( "lease keepalive response has negative ttl", diff --git a/crates/xlinectl/src/command/lease/mod.rs b/crates/xlinectl/src/command/lease/mod.rs index 7a3e06fe0..ef0287644 100644 --- a/crates/xlinectl/src/command/lease/mod.rs +++ b/crates/xlinectl/src/command/lease/mod.rs @@ -6,7 +6,7 @@ use crate::handle_matches; /// `grant` command mod grant; /// `keep_alive` command -mod keep_alive; +pub(crate) mod keep_alive; /// `list` command mod list; /// `revoke` command diff --git a/crates/xlinectl/src/command/lock.rs b/crates/xlinectl/src/command/lock.rs index 219aae1f0..da27fa2a4 100644 --- a/crates/xlinectl/src/command/lock.rs +++ b/crates/xlinectl/src/command/lock.rs @@ -1,12 +1,15 @@ use clap::{arg, ArgMatches, Command}; -use tokio::signal; +use tokio::signal::ctrl_c; use xline_client::{ error::Result, - types::lock::{LockRequest, UnlockRequest}, + types::{ + lease::{LeaseGrantRequest, LeaseKeepAliveRequest}, + lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL}, + }, Client, }; -use crate::utils::printer::Printer; +use crate::{lease::keep_alive::keep_alive_loop, utils::printer::Printer}; /// Definition of `lock` command pub(crate) fn command() -> Command { @@ -23,20 +26,32 @@ pub(crate) fn build_request(matches: &ArgMatches) -> LockRequest { /// Execute the command pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result<()> { - let req = build_request(matches); + let lease_client = client.lease_client(); + let lease_resp = lease_client + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await?; + let lock_lease_id = lease_resp.id; + let req = build_request(matches).with_lease(lock_lease_id); + let lock_resp = client.lock_client().lock(req).await?; - let resp = client.lock_client().lock(req).await?; + lock_resp.print(); - resp.print(); + let (keeper, stream) = client + .lease_client() + .keep_alive(LeaseKeepAliveRequest::new(lock_lease_id)) + .await?; - signal::ctrl_c().await.expect("failed to listen for event"); - - println!("releasing the lock"); - - let unlock_req = UnlockRequest::new(resp.key); - let _unlock_resp = client.lock_client().unlock(unlock_req).await?; - - Ok(()) + tokio::select! { + _ = ctrl_c() => { + println!("releasing the lock"); + let unlock_req = UnlockRequest::new(lock_resp.key); + let _unlock_resp = client.lock_client().unlock(unlock_req).await?; + Ok(()) + } + result = keep_alive_loop(keeper, stream, false) => { + result + } + } } #[cfg(test)]