Skip to content

Commit

Permalink
refactor by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 23, 2023
1 parent 8cb63ef commit 4fbcf34
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ use crate::{
Context, Proxy,
};

type WriteResponseFutures<'a> =
Vec<BoxFuture<'a, common_util::runtime::Result<Result<WriteResponse>>>>;

#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
Expand All @@ -71,13 +74,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
) -> Result<WriteResponse> {
let write_context = req.context.clone();
let resp = if self.cluster_with_meta {
self.handle_with_meta_write(ctx, req).await?
self.handle_write_with_meta(ctx, req).await?
} else {
self.handle_without_meta_write(ctx, req).await?
self.handle_write_without_meta(ctx, req).await?
};

debug!(
"Grpc handle write finished, write_context:{:?}, resp:{:?}",
"Execute write finished, write_context:{:?}, resp:{:?}",
write_context, resp
);
Ok(resp)
Expand All @@ -87,7 +90,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
// 1. Create table via ceresmeta if it does not exist.
// 2. Split write request.
// 3. Process write.
async fn handle_with_meta_write(
async fn handle_write_with_meta(
&self,
ctx: Context,
req: WriteRequest,
Expand Down Expand Up @@ -122,7 +125,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
// 1. Split write request.
// 2. Create table if not exist.
// 3. Process write.
async fn handle_without_meta_write(
async fn handle_write_without_meta(
&self,
ctx: Context,
req: WriteRequest,
Expand Down Expand Up @@ -356,7 +359,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

async fn collect_write_to_remote_future(
&self,
futures: &mut Vec<BoxFuture<'_, common_util::runtime::Result<Result<WriteResponse>>>>,
futures: &mut WriteResponseFutures<'_>,
write_request: HashMap<Endpoint, WriteRequest>,
) {
for (endpoint, table_write_request) in write_request {
Expand All @@ -372,7 +375,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
#[inline]
async fn collect_write_to_local_future<'a>(
&'a self,
futures: &mut Vec<BoxFuture<'a, common_util::runtime::Result<Result<WriteResponse>>>>,
futures: &mut WriteResponseFutures<'a>,
ctx: Context,
request_id: RequestId,
write_request: WriteRequest,
Expand Down

0 comments on commit 4fbcf34

Please sign in to comment.