diff --git a/integration_tests/mysql/basic.sh b/integration_tests/mysql/basic.sh index e4ef13e963..38e70fceb5 100755 --- a/integration_tests/mysql/basic.sh +++ b/integration_tests/mysql/basic.sh @@ -5,3 +5,9 @@ mysql -h 127.0.0.1 -P 3307 -e 'show tables' mysql -h 127.0.0.1 -P 3307 -e 'select 1, now()' + +mysql -h 127.0.0.1 -P 3307 -e 'CREATE TABLE `demo`(`name`string TAG,`id` int TAG,`value` double NOT NULL,`t` timestamp NOT NULL,TIMESTAMP KEY(t)) ENGINE = Analytic with(enable_ttl=false)' + +mysql -h 127.0.0.1 -P 3307 -e 'insert into demo (name,value,t)values("ceresdb",1,1683280523000)' + +mysql -h 127.0.0.1 -P 3307 -e 'select * from demo' diff --git a/proxy/src/grpc/write.rs b/proxy/src/grpc/write.rs index 49b638829e..3ffe13f8c6 100644 --- a/proxy/src/grpc/write.rs +++ b/proxy/src/grpc/write.rs @@ -4,9 +4,7 @@ use std::{cmp::max, collections::HashMap, time::Instant}; use ceresdbproto::storage::{ storage_service_client::StorageServiceClient, RouteRequest, WriteRequest, WriteResponse, - WriteTableRequest, }; -use cluster::config::SchemaConfig; use common_types::request_id::RequestId; use common_util::error::BoxError; use futures::{future::try_join_all, FutureExt}; @@ -20,12 +18,12 @@ use snafu::{OptionExt, ResultExt}; use tonic::transport::Channel; use crate::{ - create_table, error, + error, error::{build_ok_header, ErrNoCause, ErrWithCause, InternalNoCause, Result}, - execute_add_columns_plan, execute_plan, find_new_columns, + execute_plan, forward::{ForwardResult, ForwarderRef}, instance::InstanceRef, - try_get_table, write_table_request_to_insert_plan, Context, Proxy, + Context, Proxy, }; #[derive(Debug)] @@ -301,90 +299,6 @@ impl Proxy { } } -// TODO: use write_request_to_insert_plan in proxy, and remove following code. -pub async fn write_request_to_insert_plan( - instance: InstanceRef, - table_requests: Vec, - schema_config: Option<&SchemaConfig>, - write_context: WriteContext, -) -> Result> { - let mut plan_vec = Vec::with_capacity(table_requests.len()); - - let WriteContext { - request_id, - catalog, - schema, - deadline, - auto_create_table, - } = write_context; - let schema_config = schema_config.cloned().unwrap_or_default(); - for write_table_req in table_requests { - let table_name = &write_table_req.table; - let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?; - - match table.clone() { - None => { - if auto_create_table { - create_table( - request_id, - &catalog, - &schema, - instance.clone(), - &write_table_req, - &schema_config, - deadline, - ) - .await?; - // try to get table again - table = try_get_table(&catalog, &schema, instance.clone(), table_name)?; - } - } - Some(t) => { - if auto_create_table { - // The reasons for making the decision to add columns before writing are as - // follows: - // * If judged based on the error message returned, it may cause data that has - // already been successfully written to be written again and affect the - // accuracy of the data. - // * Currently, the decision to add columns is made at the request level, not at - // the row level, so the cost is relatively small. - let table_schema = t.schema(); - let columns = - find_new_columns(&table_schema, &schema_config, &write_table_req)?; - if !columns.is_empty() { - execute_add_columns_plan( - request_id, - &catalog, - &schema, - instance.clone(), - t, - columns, - deadline, - ) - .await?; - } - } - } - } - - match table { - Some(table) => { - let plan = write_table_request_to_insert_plan(table, write_table_req)?; - plan_vec.push(plan); - } - None => { - return ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Table not found, schema:{schema}, table:{table_name}"), - } - .fail(); - } - } - } - - Ok(plan_vec) -} - pub async fn execute_insert_plan( request_id: RequestId, catalog: &str, diff --git a/proxy/src/handlers/mod.rs b/proxy/src/handlers/mod.rs index da1c6e9996..663d9ccdce 100644 --- a/proxy/src/handlers/mod.rs +++ b/proxy/src/handlers/mod.rs @@ -4,7 +4,6 @@ pub mod admin; pub(crate) mod error; -pub mod query; pub mod route; mod prelude { diff --git a/proxy/src/handlers/query.rs b/proxy/src/handlers/query.rs deleted file mode 100644 index b3140621f8..0000000000 --- a/proxy/src/handlers/query.rs +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. - -//! SQL request handler - -use std::time::Instant; - -use common_types::{ - bytes::Bytes, - datum::{Datum, DatumKind}, - request_id::RequestId, -}; -use common_util::time::InstantExt; -use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; -use log::info; -use query_frontend::{ - frontend::{Context as SqlContext, Frontend}, - provider::CatalogMetaProvider, -}; -use serde::{ - ser::{SerializeMap, SerializeSeq}, - Serialize, -}; -use snafu::{ensure, ResultExt}; - -use crate::handlers::{ - error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt}, - prelude::*, -}; - -#[derive(Debug, Deserialize)] -pub struct Request { - query: String, -} - -pub struct ResponseRows { - pub column_names: Vec, - pub data: Vec>, -} - -pub struct ResponseColumn { - pub name: String, - pub data_type: DatumKind, -} - -struct Row<'a>(Vec<(&'a String, &'a Datum)>); - -impl<'a> Serialize for Row<'a> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - let rows = &self.0; - let mut map = serializer.serialize_map(Some(rows.len()))?; - for (key, value) in rows { - map.serialize_entry(key, value)?; - } - map.end() - } -} - -impl Serialize for ResponseRows { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - let total_count = self.data.len(); - let mut seq = serializer.serialize_seq(Some(total_count))?; - - for rows in &self.data { - let data = rows - .iter() - .enumerate() - .map(|(col_idx, datum)| { - let column_name = &self.column_names[col_idx].name; - (column_name, datum) - }) - .collect::>(); - let row = Row(data); - seq.serialize_element(&row)?; - } - - seq.end() - } -} - -impl From for Request { - fn from(query: String) -> Self { - Self { query } - } -} - -impl From for Request { - fn from(bytes: Bytes) -> Self { - Request::from(String::from_utf8_lossy(&bytes).to_string()) - } -} - -#[derive(Debug)] -pub enum QueryRequest { - Sql(Request), -} -impl QueryRequest { - pub fn query(&self) -> &str { - match self { - QueryRequest::Sql(request) => request.query.as_str(), - } - } -} - -pub async fn handle_query( - ctx: &RequestContext, - instance: InstanceRef, - query_request: QueryRequest, -) -> Result { - let request_id = RequestId::next_id(); - let begin_instant = Instant::now(); - let deadline = ctx.timeout.map(|t| begin_instant + t); - - info!( - "Query handler try to process request, request_id:{}, request:{:?}", - request_id, query_request - ); - - // TODO(yingwen): Privilege check, cannot access data of other tenant - // TODO(yingwen): Maybe move MetaProvider to instance - let provider = CatalogMetaProvider { - manager: instance.catalog_manager.clone(), - default_catalog: &ctx.catalog, - default_schema: &ctx.schema, - function_registry: &*instance.function_registry, - }; - let frontend = Frontend::new(provider); - let mut sql_ctx = SqlContext::new(request_id, deadline); - - let plan = match &query_request { - QueryRequest::Sql(request) => { - // Parse sql, frontend error of invalid sql already contains sql - // TODO(yingwen): Maybe move sql from frontend error to outer error - let mut stmts = frontend - .parse_sql(&mut sql_ctx, &request.query) - .context(ParseSql)?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, - TooMuchStmt { - len: stmts.len(), - query: &request.query, - } - ); - - // Create logical plan - // Note: Remember to store sql in error when creating logical plan - frontend - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) - .context(CreatePlan { - query: &request.query, - })? - } - }; - - instance.limiter.try_limit(&plan).context(QueryBlock { - query: query_request.query(), - })?; - - // Execute in interpreter - let interpreter_ctx = InterpreterContext::builder(request_id, deadline) - // Use current ctx's catalog and tenant as default catalog and tenant - .default_catalog_and_schema(ctx.catalog.to_string(), ctx.schema.to_string()) - .enable_partition_table_access(ctx.enable_partition_table_access) - .build(); - let interpreter_factory = Factory::new( - instance.query_executor.clone(), - instance.catalog_manager.clone(), - instance.table_engine.clone(), - instance.table_manipulator.clone(), - ); - let interpreter = - interpreter_factory - .create(interpreter_ctx, plan) - .context(InterpreterExec { - query: query_request.query(), - })?; - - let output = if let Some(deadline) = deadline { - tokio::time::timeout_at( - tokio::time::Instant::from_std(deadline), - interpreter.execute(), - ) - .await - .context(QueryTimeout { - query: query_request.query(), - }) - .and_then(|v| { - v.context(InterpreterExec { - query: query_request.query(), - }) - })? - } else { - interpreter.execute().await.context(InterpreterExec { - query: query_request.query(), - })? - }; - - info!( - "Query handler finished, request_id:{}, cost:{}ms, request:{:?}", - request_id, - begin_instant.saturating_elapsed().as_millis(), - query_request - ); - - Ok(output) -} diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index 974a975c28..cfb910e31a 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -24,7 +24,7 @@ use crate::{ context::RequestContext, error::{ErrNoCause, ErrWithCause, Internal, Result}, execute_plan, - grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, + grpc::write::{execute_insert_plan, WriteContext}, influxdb::types::{ convert_influxql_output, convert_write_request, InfluxqlRequest, InfluxqlResponse, WriteRequest, WriteResponse, @@ -63,17 +63,13 @@ impl Proxy { let write_context = WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); - let plans = write_request_to_insert_plan( - self.instance.clone(), - convert_write_request(req)?, - schema_config, - write_context, - ) - .await - .box_err() - .with_context(|| Internal { - msg: "write request to insert plan", - })?; + let plans = self + .write_request_to_insert_plan(convert_write_request(req)?, schema_config, write_context) + .await + .box_err() + .with_context(|| Internal { + msg: "write request to insert plan", + })?; let mut success = 0; for insert_plan in plans { diff --git a/server/src/mysql/builder.rs b/server/src/mysql/builder.rs index ad8b36e06d..bbef0350c3 100644 --- a/server/src/mysql/builder.rs +++ b/server/src/mysql/builder.rs @@ -2,7 +2,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; -use proxy::instance::InstanceRef; +use proxy::Proxy; use query_engine::executor::Executor as QueryExecutor; use router::RouterRef; use snafu::{OptionExt, ResultExt}; @@ -16,7 +16,7 @@ use crate::mysql::{ pub struct Builder { config: Config, runtimes: Option>, - instance: Option>, + proxy: Option>>, router: Option, } @@ -32,7 +32,7 @@ impl Builder { Self { config, runtimes: None, - instance: None, + proxy: None, router: None, } } @@ -42,8 +42,8 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { - self.instance = Some(instance); + pub fn proxy(mut self, proxy: Arc>) -> Self { + self.proxy = Some(proxy); self } @@ -56,15 +56,14 @@ impl Builder { impl Builder { pub fn build(self) -> Result> { let runtimes = self.runtimes.context(MissingRuntimes)?; - let instance = self.instance.context(MissingInstance)?; + let proxy = self.proxy.context(MissingInstance)?; let router = self.router.context(MissingRouter)?; let addr: SocketAddr = format!("{}:{}", self.config.ip, self.config.port) .parse() .context(ParseIpAddr { ip: self.config.ip })?; - let mysql_handler = - MysqlService::new(instance, runtimes, router, addr, self.config.timeout); + let mysql_handler = MysqlService::new(proxy, runtimes, router, addr, self.config.timeout); Ok(mysql_handler) } } diff --git a/server/src/mysql/service.rs b/server/src/mysql/service.rs index 98b22b5d53..0e41ac9f05 100644 --- a/server/src/mysql/service.rs +++ b/server/src/mysql/service.rs @@ -5,7 +5,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use common_util::runtime::JoinHandle; use log::{error, info}; use opensrv_mysql::AsyncMysqlIntermediary; -use proxy::instance::{Instance, InstanceRef}; +use proxy::Proxy; use query_engine::executor::Executor as QueryExecutor; use router::RouterRef; use table_engine::engine::EngineRuntimes; @@ -14,7 +14,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::mysql::{error::Result, worker::MysqlWorker}; pub struct MysqlService { - instance: InstanceRef, + proxy: Arc>, runtimes: Arc, router: RouterRef, socket_addr: SocketAddr, @@ -25,14 +25,14 @@ pub struct MysqlService { impl MysqlService { pub fn new( - instance: Arc>, + proxy: Arc>, runtimes: Arc, router: RouterRef, socket_addr: SocketAddr, timeout: Option, ) -> MysqlService { Self { - instance, + proxy, runtimes, router, socket_addr, @@ -53,7 +53,7 @@ impl MysqlService { info!("MySQL server tries to listen on {}", self.socket_addr); self.join_handler = Some(rt.default_runtime.spawn(Self::loop_accept( - self.instance.clone(), + self.proxy.clone(), self.runtimes.clone(), self.router.clone(), self.socket_addr, @@ -70,7 +70,7 @@ impl MysqlService { } async fn loop_accept( - instance: InstanceRef, + proxy: Arc>, runtimes: Arc, router: RouterRef, socket_addr: SocketAddr, @@ -92,12 +92,12 @@ impl MysqlService { break; } }; - let instance = instance.clone(); + let proxy = proxy.clone(); let router = router.clone(); let rt = runtimes.read_runtime.clone(); rt.spawn(AsyncMysqlIntermediary::run_on( - MysqlWorker::new(instance, router, timeout), + MysqlWorker::new(proxy, router, timeout), stream, )); }, diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index 1716fbcf73..c1aa93951a 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -6,14 +6,7 @@ use common_util::error::BoxError; use interpreters::interpreter::Output; use log::{error, info}; use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter}; -use proxy::{ - context::RequestContext, - handlers::{ - self, - query::{QueryRequest, Request}, - }, - instance::Instance, -}; +use proxy::{context::RequestContext, http::sql::Request, Proxy}; use query_engine::executor::Executor as QueryExecutor; use router::RouterRef; use snafu::ResultExt; @@ -25,7 +18,7 @@ use crate::mysql::{ pub struct MysqlWorker { generic_hold: PhantomData, - instance: Arc>, + proxy: Arc>, // TODO: Maybe support route in mysql protocol #[allow(dead_code)] router: RouterRef, @@ -37,10 +30,10 @@ where W: std::io::Write + Send + Sync, Q: QueryExecutor + 'static, { - pub fn new(instance: Arc>, router: RouterRef, timeout: Option) -> Self { + pub fn new(proxy: Arc>, router: RouterRef, timeout: Option) -> Self { Self { generic_hold: PhantomData::default(), - instance, + proxy, router, timeout, } @@ -111,9 +104,11 @@ where { async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result { let ctx = self.create_ctx()?; - let req = Request::from(sql.to_string()); - let req = QueryRequest::Sql(req); - handlers::query::handle_query(&ctx, self.instance.clone(), req) + let req = Request { + query: sql.to_string(), + }; + self.proxy + .handle_http_sql_query(&ctx, req) .await .map_err(|e| { error!("Mysql service Failed to handle sql, err: {}", e); @@ -127,12 +122,14 @@ where fn create_ctx(&self) -> Result { let default_catalog = self - .instance + .proxy + .instance() .catalog_manager .default_catalog_name() .to_string(); let default_schema = self - .instance + .proxy + .instance() .catalog_manager .default_schema_name() .to_string(); diff --git a/server/src/server.rs b/server/src/server.rs index 34744a9d47..2712df3d9d 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -386,7 +386,7 @@ impl Builder { let mysql_service = mysql::Builder::new(mysql_config) .runtimes(engine_runtimes.clone()) - .instance(instance.clone()) + .proxy(proxy.clone()) .router(router.clone()) .build() .context(BuildMysqlService)?;