From a0bbae3858db36cd5cd1eb8c6bac408cfdff0396 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Thu, 29 Feb 2024 16:31:45 +0800 Subject: [PATCH] fix: add keep alive logic for xlinectl lock command Closes: #664 Signed-off-by: Phoeniix Zhao --- crates/xline-client/src/clients/lock.rs | 34 ++++++--------- crates/xline-client/src/lib.rs | 7 +-- crates/xline-client/src/types/lock.rs | 2 +- crates/xline/tests/it/lock_test.rs | 16 +++++-- .../xlinectl/src/command/lease/keep_alive.rs | 9 ++-- crates/xlinectl/src/command/lease/mod.rs | 2 +- crates/xlinectl/src/command/lock.rs | 43 +++++++++++++------ 7 files changed, 63 insertions(+), 50 deletions(-) diff --git a/crates/xline-client/src/clients/lock.rs b/crates/xline-client/src/clients/lock.rs index ed17b52c26..58cd03b60f 100644 --- a/crates/xline-client/src/clients/lock.rs +++ b/crates/xline-client/src/clients/lock.rs @@ -19,11 +19,9 @@ use xlineapi::{ }; use crate::{ - clients::{lease::LeaseClient, watch::WatchClient}, + clients::watch::WatchClient, error::{Result, XlineClientError}, - lease_gen::LeaseIdGenerator, types::{ - lease::LeaseGrantRequest, lock::{LockRequest, UnlockRequest}, watch::WatchRequest, }, @@ -35,8 +33,6 @@ use crate::{ pub struct LockClient { /// The client running the CURP protocol, communicate with all servers. curp_client: Arc, - /// The lease client - lease_client: LeaseClient, /// The watch client watch_client: WatchClient, /// Auth token @@ -47,7 +43,6 @@ impl Debug for LockClient { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LockClient") - .field("lease_client", &self.lease_client) .field("watch_client", &self.watch_client) .field("token", &self.token) .finish() @@ -59,15 +54,9 @@ impl Debug for LockClient { impl LockClient { /// Creates a new `LockClient` #[inline] - pub fn new( - curp_client: Arc, - channel: Channel, - token: Option, - id_gen: Arc, - ) -> Self { + pub fn new(curp_client: Arc, channel: Channel, token: Option) -> Self { Self { - curp_client: Arc::clone(&curp_client), - lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen), + curp_client, watch_client: WatchClient::new(channel, token.clone()), token, } @@ -84,6 +73,10 @@ impl LockClient { /// /// This function will return an error if the inner CURP client encountered a propose failure /// + /// # Panics + /// + /// Panic if the given `LockRequest.inner.lease` less than or equal to 0 + /// /// # Examples /// /// ```no_run @@ -116,14 +109,11 @@ impl LockClient { /// ``` #[inline] pub async fn lock(&self, request: LockRequest) -> Result { - let mut lease_id = request.inner.lease; - if lease_id == 0 { - let resp = self - .lease_client - .grant(LeaseGrantRequest::new(request.ttl)) - .await?; - lease_id = resp.id; - } + assert!( + request.inner.lease > 0, + "The LockRequest.lease_id should larger than 0" + ); + let lease_id = request.inner.lease; let prefix = format!( "{}/", String::from_utf8_lossy(&request.inner.name).into_owned() diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index e1bf11c5b0..e2243a5c4f 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -261,12 +261,7 @@ impl Client { token.clone(), Arc::clone(&id_gen), ); - let lock = LockClient::new( - Arc::clone(&curp_client), - channel.clone(), - token.clone(), - id_gen, - ); + let lock = LockClient::new(Arc::clone(&curp_client), channel.clone(), token.clone()); let auth = AuthClient::new(curp_client, channel.clone(), token.clone()); let maintenance = MaintenanceClient::new(channel.clone(), token.clone()); let cluster = ClusterClient::new(channel.clone(), token.clone()); diff --git a/crates/xline-client/src/types/lock.rs b/crates/xline-client/src/types/lock.rs index f150946d83..7b225e51cd 100644 --- a/crates/xline-client/src/types/lock.rs +++ b/crates/xline-client/src/types/lock.rs @@ -1,7 +1,7 @@ pub use xlineapi::{LockResponse, UnlockResponse}; /// Default session ttl -const DEFAULT_SESSION_TTL: i64 = 60; +pub const DEFAULT_SESSION_TTL: i64 = 60; /// Request for `Lock` #[derive(Debug, PartialEq)] diff --git a/crates/xline/tests/it/lock_test.rs b/crates/xline/tests/it/lock_test.rs index 97b17d6f48..18b557df65 100644 --- a/crates/xline/tests/it/lock_test.rs +++ b/crates/xline/tests/it/lock_test.rs @@ -5,7 +5,7 @@ use tokio::time::{self, timeout}; use xline_test_utils::{ types::{ lease::LeaseGrantRequest, - lock::{LockRequest, UnlockRequest}, + lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL}, }, Cluster, }; @@ -17,11 +17,16 @@ async fn test_lock() -> Result<(), Box> { cluster.start().await; let client = cluster.client().await; let lock_client = client.lock_client(); + let lease_id_1 = client + .lease_client() + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await? + .id; let lock_handle = tokio::spawn({ let c = lock_client.clone(); async move { - let res = c.lock(LockRequest::new("test")).await.unwrap(); + let res = c.lock(LockRequest::new("test").with_lease(lease_id_1)).await.unwrap(); time::sleep(Duration::from_secs(3)).await; let _res = c.unlock(UnlockRequest::new(res.key)).await.unwrap(); } @@ -29,7 +34,12 @@ async fn test_lock() -> Result<(), Box> { time::sleep(Duration::from_secs(1)).await; let now = time::Instant::now(); - let res = lock_client.lock(LockRequest::new("test")).await?; + let lease_id_2 = client + .lease_client() + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await? + .id; + let res = lock_client.lock(LockRequest::new("test").with_lease(lease_id_2)).await?; let elapsed = now.elapsed(); assert!(res.key.starts_with(b"test")); assert!(elapsed >= Duration::from_secs(1)); diff --git a/crates/xlinectl/src/command/lease/keep_alive.rs b/crates/xlinectl/src/command/lease/keep_alive.rs index a34884be98..43f1a95c28 100644 --- a/crates/xlinectl/src/command/lease/keep_alive.rs +++ b/crates/xlinectl/src/command/lease/keep_alive.rs @@ -43,7 +43,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result } else { tokio::select! { _ = ctrl_c() => {} - result = keep_alive_loop(keeper, stream) => { + result = keep_alive_loop(keeper, stream, true) => { return result; } } @@ -53,14 +53,17 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result } /// keep alive forever unless encounter error -async fn keep_alive_loop( +pub(crate) async fn keep_alive_loop( mut keeper: LeaseKeeper, mut stream: Streaming, + verbose: bool, ) -> Result<()> { loop { keeper.keep_alive()?; if let Some(resp) = stream.message().await? { - resp.print(); + if verbose { + resp.print(); + } if resp.ttl < 0 { return Err(XlineClientError::InvalidArgs(String::from( "lease keepalive response has negative ttl", diff --git a/crates/xlinectl/src/command/lease/mod.rs b/crates/xlinectl/src/command/lease/mod.rs index 7a3e06fe0e..ef02876445 100644 --- a/crates/xlinectl/src/command/lease/mod.rs +++ b/crates/xlinectl/src/command/lease/mod.rs @@ -6,7 +6,7 @@ use crate::handle_matches; /// `grant` command mod grant; /// `keep_alive` command -mod keep_alive; +pub(crate) mod keep_alive; /// `list` command mod list; /// `revoke` command diff --git a/crates/xlinectl/src/command/lock.rs b/crates/xlinectl/src/command/lock.rs index 219aae1f0a..da27fa2a48 100644 --- a/crates/xlinectl/src/command/lock.rs +++ b/crates/xlinectl/src/command/lock.rs @@ -1,12 +1,15 @@ use clap::{arg, ArgMatches, Command}; -use tokio::signal; +use tokio::signal::ctrl_c; use xline_client::{ error::Result, - types::lock::{LockRequest, UnlockRequest}, + types::{ + lease::{LeaseGrantRequest, LeaseKeepAliveRequest}, + lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL}, + }, Client, }; -use crate::utils::printer::Printer; +use crate::{lease::keep_alive::keep_alive_loop, utils::printer::Printer}; /// Definition of `lock` command pub(crate) fn command() -> Command { @@ -23,20 +26,32 @@ pub(crate) fn build_request(matches: &ArgMatches) -> LockRequest { /// Execute the command pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result<()> { - let req = build_request(matches); + let lease_client = client.lease_client(); + let lease_resp = lease_client + .grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL)) + .await?; + let lock_lease_id = lease_resp.id; + let req = build_request(matches).with_lease(lock_lease_id); + let lock_resp = client.lock_client().lock(req).await?; - let resp = client.lock_client().lock(req).await?; + lock_resp.print(); - resp.print(); + let (keeper, stream) = client + .lease_client() + .keep_alive(LeaseKeepAliveRequest::new(lock_lease_id)) + .await?; - signal::ctrl_c().await.expect("failed to listen for event"); - - println!("releasing the lock"); - - let unlock_req = UnlockRequest::new(resp.key); - let _unlock_resp = client.lock_client().unlock(unlock_req).await?; - - Ok(()) + tokio::select! { + _ = ctrl_c() => { + println!("releasing the lock"); + let unlock_req = UnlockRequest::new(lock_resp.key); + let _unlock_resp = client.lock_client().unlock(unlock_req).await?; + Ok(()) + } + result = keep_alive_loop(keeper, stream, false) => { + result + } + } } #[cfg(test)]