Skip to content

Commit

Permalink
feat: impl mysql query with proxy (apache#886)
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 authored May 10, 2023
1 parent 61e4b7c commit 1ebce65
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 353 deletions.
6 changes: 6 additions & 0 deletions integration_tests/mysql/basic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@
mysql -h 127.0.0.1 -P 3307 -e 'show tables'

mysql -h 127.0.0.1 -P 3307 -e 'select 1, now()'

mysql -h 127.0.0.1 -P 3307 -e 'CREATE TABLE `demo`(`name`string TAG,`id` int TAG,`value` double NOT NULL,`t` timestamp NOT NULL,TIMESTAMP KEY(t)) ENGINE = Analytic with(enable_ttl=false)'

mysql -h 127.0.0.1 -P 3307 -e 'insert into demo (name,value,t)values("ceresdb",1,1683280523000)'

mysql -h 127.0.0.1 -P 3307 -e 'select * from demo'
92 changes: 3 additions & 89 deletions proxy/src/grpc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use std::{cmp::max, collections::HashMap, time::Instant};

use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RouteRequest, WriteRequest, WriteResponse,
WriteTableRequest,
};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
use common_util::error::BoxError;
use futures::{future::try_join_all, FutureExt};
Expand All @@ -20,12 +18,12 @@ use snafu::{OptionExt, ResultExt};
use tonic::transport::Channel;

use crate::{
create_table, error,
error,
error::{build_ok_header, ErrNoCause, ErrWithCause, InternalNoCause, Result},
execute_add_columns_plan, execute_plan, find_new_columns,
execute_plan,
forward::{ForwardResult, ForwarderRef},
instance::InstanceRef,
try_get_table, write_table_request_to_insert_plan, Context, Proxy,
Context, Proxy,
};

#[derive(Debug)]
Expand Down Expand Up @@ -301,90 +299,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
}
}

// TODO: use write_request_to_insert_plan in proxy, and remove following code.
pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
instance: InstanceRef<Q>,
table_requests: Vec<WriteTableRequest>,
schema_config: Option<&SchemaConfig>,
write_context: WriteContext,
) -> Result<Vec<InsertPlan>> {
let mut plan_vec = Vec::with_capacity(table_requests.len());

let WriteContext {
request_id,
catalog,
schema,
deadline,
auto_create_table,
} = write_context;
let schema_config = schema_config.cloned().unwrap_or_default();
for write_table_req in table_requests {
let table_name = &write_table_req.table;
let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;

match table.clone() {
None => {
if auto_create_table {
create_table(
request_id,
&catalog,
&schema,
instance.clone(),
&write_table_req,
&schema_config,
deadline,
)
.await?;
// try to get table again
table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
}
}
Some(t) => {
if auto_create_table {
// The reasons for making the decision to add columns before writing are as
// follows:
// * If judged based on the error message returned, it may cause data that has
// already been successfully written to be written again and affect the
// accuracy of the data.
// * Currently, the decision to add columns is made at the request level, not at
// the row level, so the cost is relatively small.
let table_schema = t.schema();
let columns =
find_new_columns(&table_schema, &schema_config, &write_table_req)?;
if !columns.is_empty() {
execute_add_columns_plan(
request_id,
&catalog,
&schema,
instance.clone(),
t,
columns,
deadline,
)
.await?;
}
}
}
}

match table {
Some(table) => {
let plan = write_table_request_to_insert_plan(table, write_table_req)?;
plan_vec.push(plan);
}
None => {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Table not found, schema:{schema}, table:{table_name}"),
}
.fail();
}
}
}

Ok(plan_vec)
}

pub async fn execute_insert_plan<Q: QueryExecutor + 'static>(
request_id: RequestId,
catalog: &str,
Expand Down
1 change: 0 additions & 1 deletion proxy/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

pub mod admin;
pub(crate) mod error;
pub mod query;
pub mod route;

mod prelude {
Expand Down
218 changes: 0 additions & 218 deletions proxy/src/handlers/query.rs

This file was deleted.

20 changes: 8 additions & 12 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
context::RequestContext,
error::{ErrNoCause, ErrWithCause, Internal, Result},
execute_plan,
grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
grpc::write::{execute_insert_plan, WriteContext},
influxdb::types::{
convert_influxql_output, convert_write_request, InfluxqlRequest, InfluxqlResponse,
WriteRequest, WriteResponse,
Expand Down Expand Up @@ -63,17 +63,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let write_context =
WriteContext::new(request_id, deadline, catalog.clone(), schema.clone());

let plans = write_request_to_insert_plan(
self.instance.clone(),
convert_write_request(req)?,
schema_config,
write_context,
)
.await
.box_err()
.with_context(|| Internal {
msg: "write request to insert plan",
})?;
let plans = self
.write_request_to_insert_plan(convert_write_request(req)?, schema_config, write_context)
.await
.box_err()
.with_context(|| Internal {
msg: "write request to insert plan",
})?;

let mut success = 0;
for insert_plan in plans {
Expand Down
Loading

0 comments on commit 1ebce65

Please sign in to comment.