Skip to content

Commit

Permalink
refactor by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 23, 2023
1 parent 8cb63ef commit 43383d7
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ use crate::{
Context, Proxy,
};

type WriteResponseFutures<'a> =
Vec<BoxFuture<'a, common_util::runtime::Result<Result<WriteResponse>>>>;

#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
Expand All @@ -71,13 +74,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
) -> Result<WriteResponse> {
let write_context = req.context.clone();
let resp = if self.cluster_with_meta {
self.handle_with_meta_write(ctx, req).await?
self.handle_write_with_meta(ctx, req).await?
} else {
self.handle_without_meta_write(ctx, req).await?
self.handle_write_without_meta(ctx, req).await?
};

debug!(
"Grpc handle write finished, write_context:{:?}, resp:{:?}",
"Handle write finished, write_context:{:?}, resp:{:?}",
write_context, resp
);
Ok(resp)
Expand All @@ -87,7 +90,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
// 1. Create table via ceresmeta if it does not exist.
// 2. Split write request.
// 3. Process write.
async fn handle_with_meta_write(
async fn handle_write_with_meta(
&self,
ctx: Context,
req: WriteRequest,
Expand Down Expand Up @@ -122,7 +125,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
// 1. Split write request.
// 2. Create table if not exist.
// 3. Process write.
async fn handle_without_meta_write(
async fn handle_write_without_meta(
&self,
ctx: Context,
req: WriteRequest,
Expand Down Expand Up @@ -272,7 +275,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
),
})?;

debug!("Grpc handle create table begin, plan:{:?}", plan);
debug!("Execute create table begin, plan:{:?}", plan);

let output = self
.execute_plan(request_id, catalog, schema, plan, deadline)
Expand Down Expand Up @@ -356,7 +359,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

async fn collect_write_to_remote_future(
&self,
futures: &mut Vec<BoxFuture<'_, common_util::runtime::Result<Result<WriteResponse>>>>,
futures: &mut WriteResponseFutures<'_>,
write_request: HashMap<Endpoint, WriteRequest>,
) {
for (endpoint, table_write_request) in write_request {
Expand All @@ -372,7 +375,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
#[inline]
async fn collect_write_to_local_future<'a>(
&'a self,
futures: &mut Vec<BoxFuture<'a, common_util::runtime::Result<Result<WriteResponse>>>>,
futures: &mut WriteResponseFutures<'a>,
ctx: Context,
request_id: RequestId,
write_request: WriteRequest,
Expand Down Expand Up @@ -483,7 +486,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
})?;

debug!(
"Grpc handle write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}",
"Local write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}",
req.table_requests
.first()
.map(|m| (&m.table, &m.tag_names, &m.field_names)),
Expand Down Expand Up @@ -581,7 +584,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
deadline: Option<Instant>,
) -> Result<usize> {
debug!(
"Grpc handle write table begin, table:{}, row_num:{}",
"Execute insert plan begin, table:{}, row_num:{}",
insert_plan.table.name(),
insert_plan.rows.num_rows()
);
Expand Down

0 comments on commit 43383d7

Please sign in to comment.