Skip to content

Commit

Permalink
introduce specialization for API v1
Browse files Browse the repository at this point in the history
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
  • Loading branch information
iosmanthus committed Jun 20, 2022
1 parent 503047f commit dd28398
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 243 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
//! # })}
//! ```

#![feature(specialization)]
#[macro_use]
pub mod request;
#[macro_use]
Expand Down
96 changes: 57 additions & 39 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
compat::stream_fn,
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader},
region_cache::RegionCache,
store::RegionStore,
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
};
use std::{collections::HashMap, sync::Arc, thread};
use std::marker::PhantomData;

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use grpcio::{EnvBuilder, Environment};
use slog::Logger;
use std::{collections::HashMap, sync::Arc, thread};
use tokio::sync::RwLock;

use tikv_client_pd::Cluster;
use tikv_client_proto::{kvrpcpb, metapb};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
use tokio::sync::RwLock;

use crate::{
BoundRange,
compat::stream_fn,
Config,
Key,
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader}, region_cache::RegionCache, Result, SecurityManager, store::RegionStore, Timestamp,
};
use crate::request::request_codec::RequestCodec;

const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
Expand All @@ -42,6 +47,7 @@ const CLIENT_PREFIX: &str = "tikv-client";
#[async_trait]
pub trait PdClient: Send + Sync + 'static {
type KvClient: KvClient + Send + Sync + 'static;
type RequestCodec: RequestCodec;

/// In transactional API, `region` is decoded (keys in raw format).
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore>;
Expand Down Expand Up @@ -69,11 +75,11 @@ pub trait PdClient: Send + Sync + 'static {

fn group_keys_by_region<K, K2>(
self: Arc<Self>,
keys: impl Iterator<Item = K> + Send + Sync + 'static,
keys: impl Iterator<Item=K> + Send + Sync + 'static,
) -> BoxStream<'static, Result<(RegionId, Vec<K2>)>>
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
{
let keys = keys.peekable();
stream_fn(keys, move |mut keys| {
Expand All @@ -95,7 +101,7 @@ pub trait PdClient: Send + Sync + 'static {
}
}
})
.boxed()
.boxed()
}

/// Returns a Stream which iterates over the contexts for each region covered by range.
Expand Down Expand Up @@ -126,7 +132,7 @@ pub trait PdClient: Send + Sync + 'static {
Ok(Some((Some(region_end), store)))
}
})
.boxed()
.boxed()
}

/// Returns a Stream which iterates over the contexts for ranges in the same region.
Expand Down Expand Up @@ -190,7 +196,7 @@ pub trait PdClient: Send + Sync + 'static {
}
}
})
.boxed()
.boxed()
}

fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
Expand All @@ -204,22 +210,27 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

fn get_request_codec(&self) -> Self::RequestCodec;
}

/// This client converts requests for the logical TiKV cluster into requests
/// for a single TiKV store using PD and internal logic.
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
// TODO: change to a real codec.
_phantom: PhantomData<C>,
}

#[async_trait]
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<C, KvC> {
type KvClient = KvC::KvClient;
type RequestCodec = C;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
let store_id = region.get_store_id()?;
Expand Down Expand Up @@ -260,15 +271,19 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

fn get_request_codec(&self) -> Self::RequestCodec {
todo!()
}
}

impl PdRpcClient<TikvConnect, Cluster> {
impl<C> PdRpcClient<C, TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient> {
) -> Result<PdRpcClient<C, TikvConnect, Cluster>> {
PdRpcClient::new(
config.clone(),
|env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout),
Expand All @@ -278,7 +293,7 @@ impl PdRpcClient<TikvConnect, Cluster> {
enable_codec,
logger,
)
.await
.await
}
}

Expand All @@ -291,18 +306,18 @@ fn thread_name(prefix: &str) -> String {
.unwrap_or_else(|| prefix.to_owned())
}

impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
) -> Result<PdRpcClient<C, KvC, Cl>>
where
PdFut: Future<Output=Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
{
let env = Arc::new(
EnvBuilder::new()
Expand All @@ -312,7 +327,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
);
let security_mgr = Arc::new(
if let (Some(ca_path), Some(cert_path), Some(key_path)) =
(&config.ca_path, &config.cert_path, &config.key_path)
(&config.ca_path, &config.cert_path, &config.key_path)
{
SecurityManager::load(ca_path, cert_path, key_path)?
} else {
Expand All @@ -329,6 +344,8 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
enable_codec,
region_cache: RegionCache::new(pd),
logger,
// TODO
_phantom: PhantomData,
})
}

Expand All @@ -352,10 +369,11 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {

#[cfg(test)]
pub mod test {
use super::*;
use futures::{executor, executor::block_on};

use crate::mock::*;

use futures::{executor, executor::block_on};
use super::*;

#[tokio::test]
async fn test_kv_client_caching() {
Expand Down Expand Up @@ -396,7 +414,7 @@ pub mod test {
vec![1].into(),
vec![2].into(),
vec![3].into(),
vec![5, 2].into()
vec![5, 2].into(),
]
);
assert_eq!(
Expand Down Expand Up @@ -458,36 +476,36 @@ pub mod test {
vec![
kvrpcpb::KeyRange {
start_key: k1.clone(),
end_key: k2.clone()
end_key: k2.clone(),
},
kvrpcpb::KeyRange {
start_key: k1,
end_key: k_split.clone()
}
end_key: k_split.clone(),
},
]
);
assert_eq!(ranges2.0, 2);
assert_eq!(
ranges2.1,
vec![kvrpcpb::KeyRange {
start_key: k_split.clone(),
end_key: k3
end_key: k3,
}]
);
assert_eq!(ranges3.0, 1);
assert_eq!(
ranges3.1,
vec![kvrpcpb::KeyRange {
start_key: k2,
end_key: k_split.clone()
end_key: k_split.clone(),
}]
);
assert_eq!(ranges4.0, 2);
assert_eq!(
ranges4.1,
vec![kvrpcpb::KeyRange {
start_key: k_split,
end_key: k4
end_key: k4,
}]
);
assert!(stream.next().is_none());
Expand Down
8 changes: 5 additions & 3 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
request::{Collect, CollectSingle, Plan},
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};
use crate::request::request_codec::RequestCodec;

const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;

Expand All @@ -26,15 +27,16 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
/// awaited to execute.
#[derive(Clone)]
pub struct Client<PdC: PdClient = PdRpcClient> {
pub struct Client<C, PdC: PdClient = PdRpcClient<C>> {
rpc: Arc<PdC>,
cf: Option<ColumnFamily>,
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
logger: Logger,
_phantom: std::marker::PhantomData<C>,
}

impl Client<PdRpcClient> {
impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
/// Create a raw [`Client`] and connect to the TiKV cluster.
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
Expand Down Expand Up @@ -158,7 +160,7 @@ impl Client<PdRpcClient> {
}
}

impl<PdC: PdClient> Client<PdC> {
impl<C:RequestCodec, PdC: PdClient> Client<C, PdC> {
/// Create a new 'get' request.
///
/// Once resolved this request will result in the fetching of the value associated with the
Expand Down
13 changes: 7 additions & 6 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
//! generated protobuf code, then calls the low-level ctor functions in the requests module.

use std::{iter::Iterator, ops::Range, sync::Arc};
use std::marker::PhantomData;

use tikv_client_proto::{kvrpcpb, metapb};

use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value};
use crate::{BoundRange, ColumnFamily, Key, KvPair, raw::requests, Value};

pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
requests::new_raw_get_request(key.into(), cf)
}

pub fn new_raw_batch_get_request(
keys: impl Iterator<Item = Key>,
keys: impl Iterator<Item=Key>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchGetRequest {
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
Expand All @@ -31,7 +32,7 @@ pub fn new_raw_put_request(
}

pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
pairs: impl Iterator<Item=KvPair>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
Expand All @@ -47,7 +48,7 @@ pub fn new_raw_delete_request(
}

pub fn new_raw_batch_delete_request(
keys: impl Iterator<Item = Key>,
keys: impl Iterator<Item=Key>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawBatchDeleteRequest {
requests::new_raw_batch_delete_request(keys.map(Into::into).collect(), cf)
Expand Down Expand Up @@ -78,7 +79,7 @@ pub fn new_raw_scan_request(
}

pub fn new_raw_batch_scan_request(
ranges: impl Iterator<Item = BoundRange>,
ranges: impl Iterator<Item=BoundRange>,
each_limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
Expand All @@ -98,7 +99,7 @@ pub fn new_cas_request(
pub fn new_raw_coprocessor_request(
copr_name: String,
copr_version_req: String,
ranges: impl Iterator<Item = BoundRange>,
ranges: impl Iterator<Item=BoundRange>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
) -> requests::RawCoprocessorRequest {
requests::new_raw_coprocessor_request(
Expand Down
Loading

0 comments on commit dd28398

Please sign in to comment.