Skip to content

Commit

Permalink
fix: http query support querying partitioned sub-table (#932)
Browse files Browse the repository at this point in the history
## Related Issues
Closes #

## Detailed Changes
The http api supports querying sub-tables of partition table.

## Test Plan 
Manual test.
  • Loading branch information
chunshao90 authored May 25, 2023
1 parent b2b21e1 commit d62e414
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 11 deletions.
1 change: 1 addition & 0 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let ctx = ProxyContext {
runtime: self.engine_runtimes.write_runtime.clone(),
timeout: ctx.timeout,
enable_partition_table_access: false,
};

let result = self.handle_write_internal(ctx, table_request).await?;
Expand Down
1 change: 1 addition & 0 deletions proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.read_runtime.clone(),
enable_partition_table_access: true,
};

match self.handle_sql(context, &ctx.schema, &req.query).await? {
Expand Down
1 change: 1 addition & 0 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
};
let result = self
.handle_write_internal(proxy_context, table_request)
Expand Down
52 changes: 49 additions & 3 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ use ceresdbproto::storage::{
use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID};
use common_util::{error::BoxError, runtime::Runtime};
use futures::FutureExt;
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
use interpreters::{
context::Context as InterpreterContext,
factory::Factory,
interpreter::{InterpreterPtr, Output},
};
use log::{error, info};
use query_engine::executor::Executor as QueryExecutor;
use query_frontend::plan::Plan;
Expand Down Expand Up @@ -362,23 +366,64 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
msg: "Request is blocked",
})?;

let interpreter =
self.build_interpreter(request_id, catalog, schema, plan, deadline, false)?;
Self::interpreter_execute_plan(interpreter, deadline).await
}

async fn execute_plan_involving_partition_table(
&self,
request_id: RequestId,
catalog: &str,
schema: &str,
plan: Plan,
deadline: Option<Instant>,
) -> Result<Output> {
self.instance
.limiter
.try_limit(&plan)
.box_err()
.context(Internal {
msg: "Request is blocked",
})?;

let interpreter =
self.build_interpreter(request_id, catalog, schema, plan, deadline, true)?;
Self::interpreter_execute_plan(interpreter, deadline).await
}

fn build_interpreter(
&self,
request_id: RequestId,
catalog: &str,
schema: &str,
plan: Plan,
deadline: Option<Instant>,
enable_partition_table_access: bool,
) -> Result<InterpreterPtr> {
let interpreter_ctx = InterpreterContext::builder(request_id, deadline)
// Use current ctx's catalog and schema as default catalog and schema
.default_catalog_and_schema(catalog.to_string(), schema.to_string())
.enable_partition_table_access(enable_partition_table_access)
.build();
let interpreter_factory = Factory::new(
self.instance.query_executor.clone(),
self.instance.catalog_manager.clone(),
self.instance.table_engine.clone(),
self.instance.table_manipulator.clone(),
);
let interpreter = interpreter_factory
interpreter_factory
.create(interpreter_ctx, plan)
.box_err()
.context(Internal {
msg: "Failed to create interpreter",
})?;
})
}

async fn interpreter_execute_plan(
interpreter: InterpreterPtr,
deadline: Option<Instant>,
) -> Result<Output> {
if let Some(deadline) = deadline {
tokio::time::timeout_at(
tokio::time::Instant::from_std(deadline),
Expand Down Expand Up @@ -406,4 +451,5 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub struct Context {
pub timeout: Option<Duration>,
pub runtime: Arc<Runtime>,
pub enable_partition_table_access: bool,
}
19 changes: 11 additions & 8 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,17 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
msg: format!("Failed to create plan, query:{sql}"),
})?;

let output = self
.execute_plan(request_id, catalog, schema, plan, deadline)
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to execute plan, sql:{sql}"),
})?;
let output = if ctx.enable_partition_table_access {
self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline)
.await
} else {
self.execute_plan(request_id, catalog, schema, plan, deadline)
.await
};
let output = output.box_err().with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to execute plan, sql:{sql}"),
})?;

let cost = begin_instant.saturating_elapsed();
info!("Handle sql query success, catalog:{catalog}, schema:{schema}, request_id:{request_id}, cost:{cost:?}, sql:{sql:?}");
Expand Down
6 changes: 6 additions & 0 deletions server/src/grpc/storage_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
};
let stream = Self::stream_sql_query_internal(ctx, proxy, req).await;

Expand All @@ -159,6 +160,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
};

let join_handle = self
Expand Down Expand Up @@ -189,6 +191,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
let ctx = Context {
runtime: self.runtimes.write_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
};

let join_handle = self.runtimes.write_runtime.spawn(async move {
Expand Down Expand Up @@ -228,6 +231,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
};
let join_handle = self
.runtimes
Expand Down Expand Up @@ -290,6 +294,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
};
let join_handle = self.runtimes.read_runtime.spawn(async move {
if req.context.is_none() {
Expand Down Expand Up @@ -329,6 +334,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
let ctx = Context {
runtime: self.runtimes.write_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
};

let join_handle = self.runtimes.write_runtime.spawn(async move {
Expand Down

0 comments on commit d62e414

Please sign in to comment.