Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add super batch for gRPC request and add more configures for client #363

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
52 changes: 50 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use serde_derive::{Deserialize, Serialize};
use std::{path::PathBuf, time::Duration};
use tikv_client_store::KvClientConfig;

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
/// The configuration for either a [`RawClient`](crate::RawClient) or a
/// [`TransactionClient`](crate::TransactionClient).
///
Expand All @@ -16,17 +18,17 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub kv_config: KvClientConfig,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);

impl Default for Config {
fn default() -> Self {
Config {
ca_path: None,
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
kv_config: KvClientConfig::default(),
}
}
}
Expand Down Expand Up @@ -80,4 +82,50 @@ impl Config {
self.timeout = timeout;
self
}

// TODO: add more config options for tivk client config
pub fn with_kv_timeout(mut self, timeout: u64) -> Self {
self.kv_config.request_timeout = timeout;
self
}

pub fn with_kv_completion_queue_size(mut self, size: usize) -> Self {
self.kv_config.completion_queue_size = size;
self
}

pub fn with_kv_grpc_keepalive_time(mut self, time: u64) -> Self {
self.kv_config.grpc_keepalive_time = time;
self
}

pub fn with_kv_grpc_keepalive_timeout(mut self, timeout: u64) -> Self {
self.kv_config.grpc_keepalive_timeout = timeout;
self
}

pub fn with_kv_allow_batch(mut self, allow_batch: bool) -> Self {
self.kv_config.allow_batch = allow_batch;
self
}

pub fn with_kv_overload_threshold(mut self, threshold: u64) -> Self {
self.kv_config.overload_threshold = threshold;
self
}

pub fn with_kv_max_batch_wait_time(mut self, wait: u64) -> Self {
self.kv_config.max_batch_wait_time = wait;
self
}

pub fn with_kv_max_batch_size(mut self, size: usize) -> Self {
self.kv_config.max_batch_size = size;
self
}

pub fn with_kv_max_inflight_requests(mut self, requests: usize) -> Self {
self.kv_config.max_inflight_requests = requests;
self
}
}
12 changes: 7 additions & 5 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use derive_new::new;
use slog::{Drain, Logger};
use std::{any::Any, sync::Arc};
use tikv_client_proto::metapb;
use tikv_client_store::{KvClient, KvConnect, Request};
use tikv_client_store::{KvClient, KvClientConfig, KvConnect, Request};

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
Expand Down Expand Up @@ -52,13 +52,15 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
#[derive(new, Default, Clone)]
pub struct MockKvClient {
pub addr: String,
dispatch: Option<Arc<dyn Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static>>,
dispatch: Option<
Arc<dyn Fn(&(dyn Any + Send)) -> Result<Box<dyn Any + Send>> + Send + Sync + 'static>,
>,
}

impl MockKvClient {
pub fn with_dispatch_hook<F>(dispatch: F) -> MockKvClient
where
F: Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static,
F: Fn(&(dyn Any + Send)) -> Result<Box<dyn Any + Send>> + Send + Sync + 'static,
{
MockKvClient {
addr: String::new(),
Expand All @@ -78,7 +80,7 @@ pub struct MockPdClient {

#[async_trait]
impl KvClient for MockKvClient {
async fn dispatch(&self, req: &dyn Request) -> Result<Box<dyn Any>> {
async fn dispatch(&self, req: Box<dyn Request>) -> Result<Box<dyn Any + Send>> {
match &self.dispatch {
Some(f) => f(req.as_any()),
None => panic!("no dispatch hook set"),
Expand All @@ -89,7 +91,7 @@ impl KvClient for MockKvClient {
impl KvConnect for MockKvConnect {
type KvClient = MockKvClient;

fn connect(&self, address: &str) -> Result<Self::KvClient> {
fn connect(&self, address: &str, _: KvClientConfig) -> Result<Self::KvClient> {
Ok(MockKvClient {
addr: address.to_owned(),
dispatch: None,
Expand Down
9 changes: 5 additions & 4 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use slog::Logger;
use std::{collections::HashMap, sync::Arc, thread};
use tikv_client_pd::Cluster;
use tikv_client_proto::{kvrpcpb, metapb};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
use tikv_client_store::{KvClient, KvClientConfig, KvConnect, TikvConnect};
use tokio::sync::RwLock;

const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";

/// The PdClient handles all the encoding stuff.
Expand Down Expand Up @@ -210,6 +209,7 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
kv_config: KvClientConfig,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
Expand Down Expand Up @@ -304,7 +304,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
{
let env = Arc::new(
EnvBuilder::new()
.cq_count(CQ_COUNT)
.cq_count(config.kv_config.completion_queue_size)
.name_prefix(thread_name(CLIENT_PREFIX))
.build(),
);
Expand All @@ -324,6 +324,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
kv_config: config.kv_config,
enable_codec,
region_cache: RegionCache::new(pd),
logger,
Expand All @@ -335,7 +336,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
return Ok(client.clone());
};
info!(self.logger, "connect to tikv endpoint: {:?}", address);
match self.kv_connect.connect(address) {
match self.kv_connect.connect(address, self.kv_config.clone()) {
Ok(client) => {
self.kv_client_cache
.write()
Expand Down
16 changes: 12 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{str::FromStr, sync::Arc, u32};
use slog::{Drain, Logger};
use tikv_client_common::Error;
use tikv_client_proto::metapb;
use tikv_client_store::KvClientConfig;

use crate::{
backoff::DEFAULT_REGION_BACKOFF,
Expand All @@ -31,6 +32,7 @@ pub struct Client<PdC: PdClient = PdRpcClient> {
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
logger: Logger,
kv_config: KvClientConfig,
}

impl Clone for Client {
Expand All @@ -40,6 +42,7 @@ impl Clone for Client {
cf: self.cf.clone(),
atomic: self.atomic,
logger: self.logger.clone(),
kv_config: self.kv_config.clone(),
}
}
}
Expand Down Expand Up @@ -106,13 +109,15 @@ impl Client<PdRpcClient> {
});
debug!(logger, "creating new raw client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config, false, logger.clone()).await?);
let rpc = Arc::new(
PdRpcClient::connect(&pd_endpoints, config.clone(), false, logger.clone()).await?,
);
Ok(Client {
rpc,
cf: None,
atomic: false,
logger,
kv_config: config.kv_config,
})
}

Expand Down Expand Up @@ -147,6 +152,7 @@ impl Client<PdRpcClient> {
cf: Some(cf),
atomic: self.atomic,
logger: self.logger.clone(),
kv_config: self.kv_config.clone(),
}
}

Expand All @@ -164,6 +170,7 @@ impl Client<PdRpcClient> {
cf: self.cf.clone(),
atomic: true,
logger: self.logger.clone(),
kv_config: self.kv_config.clone(),
}
}
}
Expand Down Expand Up @@ -773,15 +780,15 @@ mod tests {
o!(),
);
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
move |req: &dyn Any| {
move |req: &(dyn Any + Send)| {
if let Some(req) = req.downcast_ref::<kvrpcpb::RawCoprocessorRequest>() {
assert_eq!(req.copr_name, "example");
assert_eq!(req.copr_version_req, "0.1.0");
let resp = kvrpcpb::RawCoprocessorResponse {
data: req.data.clone(),
..Default::default()
};
Ok(Box::new(resp) as Box<dyn Any>)
Ok(Box::new(resp) as Box<dyn Any + Send>)
} else {
unreachable!()
}
Expand All @@ -792,6 +799,7 @@ mod tests {
cf: Some(ColumnFamily::Default),
atomic: false,
logger,
kv_config: KvClientConfig::default(),
};
let resps = client
.coprocessor(
Expand Down
16 changes: 12 additions & 4 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,21 +361,29 @@ pub struct RawCoprocessorRequest {

#[async_trait]
impl Request for RawCoprocessorRequest {
async fn dispatch(&self, client: &TikvClient, options: CallOption) -> Result<Box<dyn Any>> {
async fn dispatch(
&self,
client: &TikvClient,
options: CallOption,
) -> Result<Box<dyn Any + Send>> {
self.inner.dispatch(client, options).await
}

fn label(&self) -> &'static str {
self.inner.label()
}

fn as_any(&self) -> &dyn Any {
fn as_any(&self) -> &(dyn Any + Send) {
self.inner.as_any()
}

fn set_context(&mut self, context: kvrpcpb::Context) {
self.inner.set_context(context);
}

fn to_batch_request(&self) -> tikv_client_proto::tikvpb::batch_commands_request::Request {
todo!()
}
}

impl KvRequest for RawCoprocessorRequest {
Expand Down Expand Up @@ -483,7 +491,7 @@ mod test {
#[ignore]
fn test_raw_scan() {
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|req: &dyn Any| {
|req: &(dyn Any + Send)| {
let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap();
assert!(req.key_only);
assert_eq!(req.limit, 10);
Expand All @@ -497,7 +505,7 @@ mod test {
resp.kvs.push(kv);
}

Ok(Box::new(resp) as Box<dyn Any>)
Ok(Box::new(resp) as Box<dyn Any + Send>)
},
)));

Expand Down
16 changes: 11 additions & 5 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,27 @@ mod test {

#[async_trait]
impl Request for MockKvRequest {
async fn dispatch(&self, _: &TikvClient, _: CallOption) -> Result<Box<dyn Any>> {
async fn dispatch(&self, _: &TikvClient, _: CallOption) -> Result<Box<dyn Any + Send>> {
Ok(Box::new(MockRpcResponse {}))
}

fn label(&self) -> &'static str {
"mock"
}

fn as_any(&self) -> &dyn Any {
fn as_any(&self) -> &(dyn Any + Send) {
self
}

fn set_context(&mut self, _: kvrpcpb::Context) {
unreachable!();
}

fn to_batch_request(
&self,
) -> tikv_client_proto::tikvpb::batch_commands_request::Request {
todo!()
}
}

#[async_trait]
Expand Down Expand Up @@ -162,7 +168,7 @@ mod test {
};

let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
|_: &(dyn Any + Send)| Ok(Box::new(MockRpcResponse) as Box<dyn Any + Send>),
)));

let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
Expand All @@ -179,12 +185,12 @@ mod test {
#[tokio::test]
async fn test_extract_error() {
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| {
|_: &(dyn Any + Send)| {
Ok(Box::new(kvrpcpb::CommitResponse {
region_error: None,
error: Some(kvrpcpb::KeyError::default()),
commit_version: 0,
}) as Box<dyn Any>)
}) as Box<dyn Any + Send>)
},
)));

Expand Down
4 changes: 2 additions & 2 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
.kv_client
.as_ref()
.expect("Unreachable: kv_client has not been initialised in Dispatch")
.dispatch(&self.request)
.dispatch(Box::new(self.request.clone()))
.await;
let result = stats.done(result);
result.map(|r| {
Expand Down Expand Up @@ -85,7 +85,7 @@ where
preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> {
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
let mut handles = Vec::new();
let mut handles = Vec::with_capacity(shards.len());
for shard in shards {
let (shard, region_store) = shard?;
let mut clone = current_plan.clone();
Expand Down
Loading