Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support auto create table config #713

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
target
.DS_Store
.idea/
.vscode
.vscode
5 changes: 5 additions & 0 deletions server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub struct ServerConfig {

/// Config for forwarding
pub forward: forward::Config,

/// Whether to create table automatically when data is first written, only
/// used in gRPC
pub auto_create_table: bool,
}

impl Default for ServerConfig {
Expand All @@ -120,6 +124,7 @@ impl Default for ServerConfig {
grpc_server_cq_count: 20,
resp_compress_min_length: ReadableSize::mb(4),
forward: forward::Config::default(),
auto_create_table: true,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ pub struct Builder<Q> {
opened_wals: Option<OpenedWals>,
schema_config_provider: Option<SchemaConfigProviderRef>,
forward_config: Option<forward::Config>,
auto_create_table: bool,
}

impl<Q> Builder<Q> {
Expand All @@ -228,6 +229,7 @@ impl<Q> Builder<Q> {
opened_wals: None,
schema_config_provider: None,
forward_config: None,
auto_create_table: true,
}
}

Expand Down Expand Up @@ -287,6 +289,11 @@ impl<Q> Builder<Q> {
self.timeout = timeout;
self
}

pub fn auto_create_table(mut self, auto_create_table: bool) -> Self {
self.auto_create_table = auto_create_table;
self
}
}

impl<Q: QueryExecutor + 'static> Builder<Q> {
Expand Down Expand Up @@ -339,6 +346,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
forwarder,
timeout: self.timeout,
resp_compress_min_length: self.resp_compress_min_length,
auto_create_table: self.auto_create_table,
};
let rpc_server = StorageServiceServer::new(storage_service);

Expand Down
10 changes: 9 additions & 1 deletion server/src/grpc/storage_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub struct HandlerContext<'a, Q> {
forwarder: Option<ForwarderRef>,
timeout: Option<Duration>,
resp_compress_min_length: usize,
auto_create_table: bool,
}

impl<'a, Q> HandlerContext<'a, Q> {
Expand All @@ -110,6 +111,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
forwarder: Option<ForwarderRef>,
timeout: Option<Duration>,
resp_compress_min_length: usize,
auto_create_table: bool,
) -> Self {
// catalog is not exposed to protocol layer
let catalog = instance.catalog_manager.default_catalog_name().to_string();
Expand All @@ -123,6 +125,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
forwarder,
timeout,
resp_compress_min_length,
auto_create_table,
}
}

Expand All @@ -141,6 +144,7 @@ pub struct StorageServiceImpl<Q: QueryExecutor + 'static> {
pub forwarder: Option<ForwarderRef>,
pub timeout: Option<Duration>,
pub resp_compress_min_length: usize,
pub auto_create_table: bool,
}

macro_rules! handle_request {
Expand All @@ -158,6 +162,7 @@ macro_rules! handle_request {
let forwarder = self.forwarder.clone();
let timeout = self.timeout;
let resp_compress_min_length = self.resp_compress_min_length;
let auto_create_table = self.auto_create_table;

// The future spawned by tokio cannot be executed by other executor/runtime, so

Expand All @@ -179,7 +184,7 @@ macro_rules! handle_request {
.fail()?
}
let handler_ctx =
HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length);
HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length, auto_create_table);
$mod_name::$handle_fn(&handler_ctx, req)
.await
.map_err(|e| {
Expand Down Expand Up @@ -254,6 +259,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
self.forwarder.clone(),
self.timeout,
self.resp_compress_min_length,
self.auto_create_table,
);

let mut total_success = 0;
Expand Down Expand Up @@ -310,6 +316,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
let forwarder = self.forwarder.clone();
let timeout = self.timeout;
let resp_compress_min_length = self.resp_compress_min_length;
let auto_create_table = self.auto_create_table;

let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);
self.runtimes.read_runtime.spawn(async move {
Expand All @@ -321,6 +328,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
forwarder,
timeout,
resp_compress_min_length,
auto_create_table
);
let query_req = request.into_inner();
let output = sql_query::fetch_query_output(&handler_ctx, &query_req)
Expand Down
62 changes: 49 additions & 13 deletions server/src/grpc/storage_service/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ use crate::{
instance::InstanceRef,
};

#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
pub deadline: Option<Instant>,
pub catalog: String,
pub schema: String,
pub auto_create_table: bool,
}

impl WriteContext {
pub fn new(
request_id: RequestId,
deadline: Option<Instant>,
catalog: String,
schema: String,
) -> Self {
let auto_create_table = true;
Self {
request_id,
deadline,
catalog,
schema,
auto_create_table,
}
}
}
pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
ctx: &HandlerContext<'_, Q>,
req: WriteRequest,
Expand Down Expand Up @@ -70,14 +96,19 @@ pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
req.table_requests.len(),
);

let plan_vec = write_request_to_insert_plan(
let write_context = WriteContext {
request_id,
catalog,
&schema,
deadline,
catalog: catalog.to_string(),
schema: schema.to_string(),
auto_create_table: ctx.auto_create_table,
};

let plan_vec = write_request_to_insert_plan(
ctx.instance.clone(),
req.table_requests,
schema_config,
deadline,
write_context,
)
.await?;

Expand Down Expand Up @@ -169,35 +200,40 @@ pub async fn execute_plan<Q: QueryExecutor + 'static>(
}

pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
request_id: RequestId,
catalog: &str,
schema: &str,
instance: InstanceRef<Q>,
table_requests: Vec<WriteTableRequest>,
schema_config: Option<&SchemaConfig>,
deadline: Option<Instant>,
write_context: WriteContext,
) -> Result<Vec<InsertPlan>> {
let WriteContext {
request_id,
catalog,
schema,
deadline,
auto_create_table,
} = write_context;

let mut plan_vec = Vec::with_capacity(table_requests.len());

for write_table_req in table_requests {
let table_name = &write_table_req.table;
let mut table = try_get_table(catalog, schema, instance.clone(), table_name)?;
let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;

if table.is_none() {
if table.is_none() && auto_create_table {
// TODO: remove this clone?
let schema_config = schema_config.cloned().unwrap_or_default();
create_table(
request_id,
catalog,
schema,
&catalog,
&schema,
instance.clone(),
&write_table_req,
&schema_config,
deadline,
)
.await?;
// try to get table again
table = try_get_table(catalog, schema, instance.clone(), table_name)?;
table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
}

match table {
Expand Down
13 changes: 7 additions & 6 deletions server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use warp::reject;

use crate::{
context::RequestContext, handlers, instance::InstanceRef,
schema_config_provider::SchemaConfigProviderRef,
context::RequestContext, grpc::storage_service::write::WriteContext, handlers,
instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -242,14 +242,15 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
.schema_config_provider
.schema_config(schema)
.context(SchemaError)?;

let write_context =
WriteContext::new(request_id, deadline, catalog.clone(), schema.clone());

let plans = crate::grpc::storage_service::write::write_request_to_insert_plan(
request_id,
catalog,
schema,
self.instance.clone(),
Self::convert_write_request(req)?,
schema_config,
deadline,
write_context,
)
.await
.context(GRPCWriteError)?;
Expand Down
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.schema_config_provider(provider)
.forward_config(self.config.forward)
.timeout(self.config.timeout.map(|v| v.0))
.auto_create_table(self.config.auto_create_table)
.build()
.context(BuildGrpcService)?;

Expand Down