Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: proxy write and route #889

Merged
merged 3 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions proxy/src/grpc/route.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use ceresdbproto::storage::{RouteRequest, RouteResponse};
use common_util::error::BoxError;
use http::StatusCode;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;

use crate::{error, error::ErrWithCause, Context, Proxy};
use crate::{error, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse {
let routes = self
.router
.route(req)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to route",
});
let routes = self.route(req).await;

let mut resp = RouteResponse::default();
match routes {
Expand Down
313 changes: 7 additions & 306 deletions proxy/src/grpc/write.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,9 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{cmp::max, collections::HashMap, time::Instant};

use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RouteRequest, WriteRequest, WriteResponse,
};
use common_types::request_id::RequestId;
use common_util::error::BoxError;
use futures::{future::try_join_all, FutureExt};
use http::StatusCode;
use interpreters::interpreter::Output;
use log::debug;
use ceresdbproto::storage::{WriteRequest, WriteResponse};
use query_engine::executor::Executor as QueryExecutor;
use query_frontend::plan::{InsertPlan, Plan};
use router::endpoint::Endpoint;
use snafu::{OptionExt, ResultExt};
use tonic::transport::Channel;

use crate::{
error,
error::{build_ok_header, ErrNoCause, ErrWithCause, InternalNoCause, Result},
execute_plan,
forward::{ForwardResult, ForwarderRef},
instance::InstanceRef,
Context, Proxy,
};

#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
pub deadline: Option<Instant>,
pub catalog: String,
pub schema: String,
pub auto_create_table: bool,
}

impl WriteContext {
pub fn new(
request_id: RequestId,
deadline: Option<Instant>,
catalog: String,
schema: String,
) -> Self {
let auto_create_table = true;
Self {
request_id,
deadline,
catalog,
schema,
auto_create_table,
}
}
}
use crate::{error, error::build_ok_header, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_write(&self, ctx: Context, req: WriteRequest) -> WriteResponse {
Expand All @@ -64,262 +16,11 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
..Default::default()
}
}
Ok(v) => v,
}
}

async fn handle_write_internal(
&self,
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let write_context = req.context.clone().context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;

let (write_request_to_local, write_requests_to_forward) =
self.split_write_request(req).await?;

let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1);

// Write to remote.
for (endpoint, table_write_request) in write_requests_to_forward {
let forwarder = self.forwarder.clone();
let write_handle = self.engine_runtimes.io_runtime.spawn(async move {
Self::write_to_remote(forwarder, endpoint, table_write_request).await
});

futures.push(write_handle.boxed());
}

// Write to local.
if !write_request_to_local.table_requests.is_empty() {
let local_handle =
async move { Ok(self.write_to_local(ctx, write_request_to_local).await) };
futures.push(local_handle.boxed());
}

let resps = try_join_all(futures)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to join task",
})?;

debug!(
"Grpc handle write finished, schema:{}, resps:{:?}",
write_context.database, resps
);

let mut success = 0;
for resp in resps {
success += resp?.success;
}

Ok(WriteResponse {
success,
header: Some(build_ok_header()),
..Default::default()
})
}

async fn write_to_remote(
forwarder: ForwarderRef,
endpoint: Endpoint,
table_write_request: WriteRequest,
) -> Result<WriteResponse> {
let do_write = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<WriteRequest>,
_: &Endpoint| {
let write = async move {
client
.write(request)
.await
.map(|resp| resp.into_inner())
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Forwarded write request failed",
})
}
.boxed();

Box::new(write) as _
};

let forward_result = forwarder
.forward_with_endpoint(endpoint, tonic::Request::new(table_write_request), do_write)
.await;
let forward_res = forward_result
.map_err(|e| {
error!("Failed to forward sql req but the error is ignored, err:{e}");
e
})
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Local response is not expected",
})?;

match forward_res {
ForwardResult::Forwarded(resp) => resp,
ForwardResult::Local => InternalNoCause {
msg: "Local response is not expected".to_string(),
}
.fail(),
}
}

async fn write_to_local(&self, ctx: Context, req: WriteRequest) -> Result<WriteResponse> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();
let req_ctx = req.context.context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;
let schema = req_ctx.database;
let schema_config = self
.schema_config_provider
.schema_config(&schema)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema_name:{schema}"),
})?;

debug!(
"Grpc handle write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}",
req.table_requests
.first()
.map(|m| (&m.table, &m.tag_names, &m.field_names)),
req.table_requests.len(),
);

let write_context = WriteContext {
request_id,
deadline,
catalog: catalog.to_string(),
schema: schema.clone(),
auto_create_table: self.auto_create_table,
};

let plan_vec = self
.write_request_to_insert_plan(req.table_requests, schema_config, write_context)
.await?;

let mut success = 0;
for insert_plan in plan_vec {
success += execute_insert_plan(
request_id,
catalog,
&schema,
self.instance.clone(),
insert_plan,
deadline,
)
.await?;
}

Ok(WriteResponse {
success: success as u32,
header: Some(build_ok_header()),
..Default::default()
})
}

async fn split_write_request(
&self,
req: WriteRequest,
) -> Result<(WriteRequest, HashMap<Endpoint, WriteRequest>)> {
// Split write request into multiple requests, each request contains table
// belong to one remote engine.
let tables = req
.table_requests
.iter()
.map(|table_request| table_request.table.clone())
.collect();

// TODO: Make the router can accept an iterator over the tables to avoid the
// memory allocation here.
let route_data = self
.router
.route(RouteRequest {
context: req.context.clone(),
tables,
})
.await?;
let forwarded_table_routes = route_data
.into_iter()
.filter_map(|router| {
router
.endpoint
.map(|endpoint| (router.table, endpoint.into()))
})
.filter(|router| !self.forwarder.is_local_endpoint(&router.1))
.collect::<HashMap<_, _>>();

// No table need to be forwarded.
if forwarded_table_routes.is_empty() {
return Ok((req, HashMap::default()));
}

let mut table_requests_to_local = WriteRequest {
table_requests: Vec::with_capacity(max(
req.table_requests.len() - forwarded_table_routes.len(),
0,
)),
context: req.context.clone(),
};

let mut table_requests_to_forward = HashMap::with_capacity(forwarded_table_routes.len());

let write_context = req.context;
for table_request in req.table_requests {
let route = forwarded_table_routes.get(&table_request.table);
match route {
Some(endpoint) => {
let table_requests = table_requests_to_forward
.entry(endpoint.clone())
.or_insert_with(|| WriteRequest {
context: write_context.clone(),
table_requests: Vec::new(),
});
table_requests.table_requests.push(table_request);
}
_ => {
table_requests_to_local.table_requests.push(table_request);
}
}
Ok(v) => WriteResponse {
header: Some(build_ok_header()),
success: v.success,
failed: v.failed,
},
}
Ok((table_requests_to_local, table_requests_to_forward))
}
}

pub async fn execute_insert_plan<Q: QueryExecutor + 'static>(
request_id: RequestId,
catalog: &str,
schema: &str,
instance: InstanceRef<Q>,
insert_plan: InsertPlan,
deadline: Option<Instant>,
) -> Result<usize> {
debug!(
"Grpc handle write table begin, table:{}, row_num:{}",
insert_plan.table.name(),
insert_plan.rows.num_rows()
);
let plan = Plan::Insert(insert_plan);
let output = execute_plan(request_id, catalog, schema, instance, plan, deadline).await;
output.and_then(|output| match output {
Output::AffectedRows(n) => Ok(n),
Output::Records(_) => ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "Invalid output type, expect AffectedRows, found Records",
}
.fail(),
})
}
6 changes: 0 additions & 6 deletions proxy/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ pub enum Error {
source: tokio::time::error::Elapsed,
backtrace: Backtrace,
},

#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
RouteHandler {
table: String,
source: router::Error,
},
}

define_result!(Error);
Expand Down
1 change: 0 additions & 1 deletion proxy/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

pub mod admin;
pub(crate) mod error;
pub mod route;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
Loading