From 4bb65412d73f853833b259b309cddf12f2f6f772 Mon Sep 17 00:00:00 2001 From: yongman Date: Thu, 8 Sep 2022 18:44:19 +0800 Subject: [PATCH] Add a new begin_latest_read api for transaction support Signed-off-by: yongman --- src/transaction/client.rs | 33 ++++++++++++++++++++++---- src/transaction/transaction.rs | 40 ++++++++++++++++++++++---------- tests/integration_tests.rs | 28 +++++++++++++++++++++- tikv-client-common/src/errors.rs | 3 +++ 4 files changed, 86 insertions(+), 18 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 500a38d8..c8f1fbcc 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -276,12 +276,35 @@ impl Client { Ok(res) } - pub fn new_transaction( - &self, - timestamp: Timestamp, - options: TransactionOptions, - ) -> Transaction { + fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { let logger = self.logger.new(o!("child" => 1)); Transaction::new(timestamp, self.pd.clone(), options, logger) } + + /// Creates a new latest commit readonly [`Transaction`]. + /// + /// Read operations will read the latest commit data which is not a snapshot read. + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::{Config, TransactionClient}; + /// # use futures::prelude::*; + /// # use tikv_client::TransactionOptions; + /// # futures::executor::block_on(async { + /// let client = TransactionClient::new(vec!["192.168.0.100"], None) + /// .await + /// .unwrap(); + /// let options = TransactionOptions::new_optimistic(); + /// let mut transaction = client.begin_latest_read(options); + /// // ... Issue some reads. + /// # }); + /// ``` + pub fn begin_latest_read(&self, options: TransactionOptions) -> Transaction { + debug!( + self.logger, + "creating new latest commit readonly transaction" + ); + self.new_transaction(Timestamp::from_version(u64::MAX), options.read_only()) + } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 30f9602a..55320761 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -113,7 +113,7 @@ impl Transaction { /// ``` pub async fn get(&mut self, key: impl Into) -> Result> { debug!(self.logger, "invoking transactional get request"); - self.check_allow_operation().await?; + self.check_allow_operation(true).await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); let key = key.into(); @@ -177,7 +177,7 @@ impl Transaction { /// ``` pub async fn get_for_update(&mut self, key: impl Into) -> Result> { debug!(self.logger, "invoking transactional get_for_update request"); - self.check_allow_operation().await?; + self.check_allow_operation(false).await?; if !self.is_pessimistic() { let key = key.into(); self.lock_keys(iter::once(key.clone())).await?; @@ -244,7 +244,7 @@ impl Transaction { keys: impl IntoIterator>, ) -> Result> { debug!(self.logger, "invoking transactional batch_get request"); - self.check_allow_operation().await?; + self.check_allow_operation(true).await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); @@ -299,7 +299,7 @@ impl Transaction { self.logger, "invoking transactional batch_get_for_update request" ); - self.check_allow_operation().await?; + self.check_allow_operation(false).await?; let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); if !self.is_pessimistic() { self.lock_keys(keys.clone()).await?; @@ -433,7 +433,7 @@ impl Transaction { /// ``` pub async fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!(self.logger, "invoking transactional put request"); - self.check_allow_operation().await?; + self.check_allow_operation(false).await?; let key = key.into(); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) @@ -464,7 +464,7 @@ impl Transaction { /// ``` pub async fn insert(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!(self.logger, "invoking transactional insert request"); - self.check_allow_operation().await?; + self.check_allow_operation(false).await?; let key = key.into(); if self.buffer.get(&key).is_some() { return Err(Error::DuplicateKeyInsertion); @@ -499,7 +499,7 @@ impl Transaction { /// ``` pub async fn delete(&mut self, key: impl Into) -> Result<()> { debug!(self.logger, "invoking transactional delete request"); - self.check_allow_operation().await?; + self.check_allow_operation(false).await?; let key = key.into(); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) @@ -537,7 +537,7 @@ impl Transaction { keys: impl IntoIterator>, ) -> Result<()> { debug!(self.logger, "invoking transactional lock_keys request"); - self.check_allow_operation().await?; + self.check_allow_operation(false).await?; match self.options.kind { TransactionKind::Optimistic => { for key in keys { @@ -569,6 +569,15 @@ impl Transaction { /// ``` pub async fn commit(&mut self) -> Result> { debug!(self.logger, "commiting transaction"); + + { + // readonly transaction no need to commit + let status = self.status.read().await; + if *status == TransactionStatus::ReadOnly { + return Ok(None); + } + } + { let mut status = self.status.write().await; if !matches!( @@ -677,7 +686,7 @@ impl Transaction { #[doc(hidden)] pub async fn send_heart_beat(&mut self) -> Result { debug!(self.logger, "sending heart_beat"); - self.check_allow_operation().await?; + self.check_allow_operation(true).await?; let primary_key = match self.buffer.get_primary_key() { Some(k) => k, None => return Err(Error::NoPrimaryKey), @@ -703,7 +712,7 @@ impl Transaction { key_only: bool, reverse: bool, ) -> Result> { - self.check_allow_operation().await?; + self.check_allow_operation(true).await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); @@ -840,10 +849,17 @@ impl Transaction { } /// Checks if the transaction can perform arbitrary operations. - async fn check_allow_operation(&self) -> Result<()> { + async fn check_allow_operation(&self, readonly: bool) -> Result<()> { let status = self.status.read().await; match *status { - TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()), + TransactionStatus::Active => Ok(()), + TransactionStatus::ReadOnly => { + if readonly { + Ok(()) + } else { + Err(Error::OperationReadOnlyError) + } + } TransactionStatus::Committed | TransactionStatus::Rolledback | TransactionStatus::StartedCommit diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 9d174aad..371326f5 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -17,9 +17,10 @@ use futures::prelude::*; use rand::{seq::IteratorRandom, thread_rng, Rng}; use serial_test::serial; use std::{ + assert_eq, collections::{HashMap, HashSet}, convert::TryInto, - iter, + iter, matches, }; use tikv_client::{ transaction::HeartbeatOption, BoundRange, Error, Key, KvPair, RawClient, Result, Transaction, @@ -942,6 +943,31 @@ async fn txn_key_exists() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn txn_latest_read() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?; + let key = "key".to_owned(); + let value = "value".to_owned(); + let options = TransactionOptions::new_optimistic(); + let mut t1 = client.begin_latest_read(options); + t1.get(key.clone()).await?; + t1.put(key.clone(), value.clone()) + .await + .map_err(|_e| matches!(Error::OperationReadOnlyError, _e)) + .unwrap_err(); + // commit is no needed for readonly transaction, commit() will take no effect if called. + t1.commit().await?; + + let options = TransactionOptions::new_pessimistic(); + let mut t2 = client.begin_latest_read(options); + t2.get(key.clone()).await?; + // t2.commit().await?; + + Ok(()) +} + // helper function async fn get_u32(client: &RawClient, key: Vec) -> Result { let x = client.get(key).await?.unwrap(); diff --git a/tikv-client-common/src/errors.rs b/tikv-client-common/src/errors.rs index db309107..531adcc2 100644 --- a/tikv-client-common/src/errors.rs +++ b/tikv-client-common/src/errors.rs @@ -22,6 +22,9 @@ pub enum Error { /// It's not allowed to perform operations in a transaction after it has been committed or rolled back. #[error("Cannot read or write data after any attempt to commit or roll back the transaction")] OperationAfterCommitError, + /// It's not allowed to perform write operation in a readonly transaction. + #[error("Cannot write data in read-only transaction")] + OperationReadOnlyError, /// We tried to use 1pc for a transaction, but it didn't work. Probably should have used 2pc. #[error("1PC transaction could not be committed.")] OnePcFailure,