Skip to content

Commit

Permalink
fix: table name is normalized when find timestamp column (#1446)
Browse files Browse the repository at this point in the history
## Rationale


## Detailed Changes
- Use `get_table_ref` to get `TableReference`, which will not normalize
table name
- Return error when timestamp column is not found.

## Test Plan
CI
  • Loading branch information
jiacai2050 authored Jan 22, 2024
1 parent a5684a8 commit 264826e
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 60 deletions.
34 changes: 34 additions & 0 deletions integration_tests/cases/env/local/ddl/query-plan.result
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,36 @@ plan_type,plan,
String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=0\n num_ssts=1\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_sst_1, fetched_columns:[t,name]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=408\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"),


CREATE TABLE `TEST_QUERY_PRIORITY` (
NAME string TAG,
VALUE double NOT NULL,
TS timestamp NOT NULL,
timestamp KEY (TS)) ENGINE = Analytic WITH (
enable_ttl = 'false',
segment_duration = '2h',
update_mode = 'append'
);

affected_rows: 0

-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000 and TS < 1695348002000;

plan_type,plan,
String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=High, metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None), TS < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=false\n=0]\n"),


-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000;

plan_type,plan,
String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n=0]\n"),


DROP TABLE `03_dml_select_real_time_range`;

affected_rows: 0
Expand All @@ -126,3 +156,7 @@ DROP TABLE `03_append_mode_table`;

affected_rows: 0

DROP TABLE `TEST_QUERY_PRIORITY`;

affected_rows: 0

21 changes: 21 additions & 0 deletions integration_tests/cases/env/local/ddl/query-plan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,26 @@ where t >= 1695348001000 and name = 'ceresdb';
explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';

CREATE TABLE `TEST_QUERY_PRIORITY` (
NAME string TAG,
VALUE double NOT NULL,
TS timestamp NOT NULL,
timestamp KEY (TS)) ENGINE = Analytic WITH (
enable_ttl = 'false',
segment_duration = '2h',
update_mode = 'append'
);

-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000 and TS < 1695348002000;

-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000;

DROP TABLE `03_dml_select_real_time_range`;
DROP TABLE `03_append_mode_table`;
DROP TABLE `TEST_QUERY_PRIORITY`;
28 changes: 12 additions & 16 deletions integration_tests/cases/env/local/system/system_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@ CREATE TABLE `01_system_table1` (

affected_rows: 0

-- FIXME
SELECT
`timestamp`,
`catalog`,
`schema`,
`table_name`,
`engine`
FROM
system.public.tables
WHERE
table_name = '01_system_table1';

timestamp,catalog,schema,table_name,engine,
Timestamp(0),String("horaedb"),String("public"),String("01_system_table1"),String("Analytic"),


-- TODO: when query table in system catalog, it will throw errors now
-- Couldn't find table in table container
-- SELECT
-- `timestamp`,
-- `catalog`,
-- `schema`,
-- `table_name`,
-- `engine`
-- FROM
-- system.public.tables
-- WHERE
-- table_name = '01_system_table1';
-- FIXME
SHOW TABLES LIKE '01%';

Expand Down
23 changes: 12 additions & 11 deletions integration_tests/cases/env/local/system/system_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ CREATE TABLE `01_system_table1` (
timestamp KEY (timestamp)) ENGINE=Analytic;


-- FIXME
SELECT
`timestamp`,
`catalog`,
`schema`,
`table_name`,
`engine`
FROM
system.public.tables
WHERE
table_name = '01_system_table1';
-- TODO: when query table in system catalog, it will throw errors now
-- Couldn't find table in table container
-- SELECT
-- `timestamp`,
-- `catalog`,
-- `schema`,
-- `table_name`,
-- `engine`
-- FROM
-- system.public.tables
-- WHERE
-- table_name = '01_system_table1';


-- FIXME
Expand Down
8 changes: 4 additions & 4 deletions src/components/system_stats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ mod tests {
assert!(stats.total_memory > 0);
assert!(stats.used_memory > 0);
assert!(stats.used_memory < stats.total_memory);
assert!(stats.cpu_usage > 0.0);
assert!(stats.load_avg.one > 0.0);
assert!(stats.load_avg.five > 0.0);
assert!(stats.load_avg.fifteen > 0.0);
assert!(stats.cpu_usage >= 0.0);
assert!(stats.load_avg.one >= 0.0);
assert!(stats.load_avg.five >= 0.0);
assert!(stats.load_avg.fifteen >= 0.0);
}

#[tokio::test]
Expand Down
13 changes: 10 additions & 3 deletions src/interpreters/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,16 @@ impl Interpreter for SelectInterpreter {
async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
let request_id = self.ctx.request_id();
let plan = self.plan;
let priority = match plan.decide_query_priority(PriorityContext {
time_range_threshold: self.ctx.expensive_query_threshold(),
}) {
let priority = match plan
.decide_query_priority(PriorityContext {
time_range_threshold: self.ctx.expensive_query_threshold(),
})
.box_err()
.with_context(|| ExecutePlan {
msg: format!("decide query priority failed, id:{request_id}"),
})
.context(Select)?
{
Some(v) => v,
None => {
debug!(
Expand Down
10 changes: 9 additions & 1 deletion src/proxy/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::{collections::HashSet, str::FromStr, sync::RwLock};

use datafusion::logical_expr::logical_plan::LogicalPlan;
use logger::error;
use macros::define_result;
use query_frontend::plan::Plan;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -74,7 +75,14 @@ impl BlockRule {
BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
BlockRule::QueryRange(threshold) => {
if let Plan::Query(plan) = plan {
if let Some(range) = plan.query_range() {
let range = match plan.query_range() {
Ok(v) => v,
Err(e) => {
error!("Find query range failed, err:{e}");
return false;
}
};
if let Some(range) = range {
if range > *threshold {
return true;
}
Expand Down
12 changes: 9 additions & 3 deletions src/proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,15 @@ impl Proxy {
}

if let Plan::Query(plan) = &plan {
if let Some(priority) = plan.decide_query_priority(PriorityContext {
time_range_threshold: self.expensive_query_threshold,
}) {
if let Some(priority) = plan
.decide_query_priority(PriorityContext {
time_range_threshold: self.expensive_query_threshold,
})
.box_err()
.context(Internal {
msg: format!("Decide query priority failed, table_name:{table_name:?}"),
})?
{
slow_timer.priority(priority);
}
}
Expand Down
62 changes: 40 additions & 22 deletions src/query_frontend/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use datafusion::{
use logger::{debug, warn};
use macros::define_result;
use runtime::Priority;
use snafu::Snafu;
use snafu::{OptionExt, Snafu};
use table_engine::{partition::PartitionInfo, table::TableRef};

use crate::{ast::ShowCreateObject, container::TableContainer};
use crate::{ast::ShowCreateObject, container::TableContainer, planner::get_table_ref};

#[derive(Debug, Snafu)]
pub enum Error {
Expand All @@ -54,6 +54,9 @@ pub enum Error {

#[snafu(display("Alter primary key is not allowed."))]
AlterPrimaryKey,

#[snafu(display("Query plan is invalid, msg:{msg}."))]
InvalidQueryPlan { msg: String },
}

define_result!(Error);
Expand Down Expand Up @@ -109,12 +112,22 @@ pub struct QueryPlan {
}

impl QueryPlan {
fn find_timestamp_column(&self) -> Option<Column> {
let table_name = self.table_name.as_ref()?;
let table_ref = self.tables.get(table_name.into())?;
fn find_timestamp_column(&self) -> Result<Option<Column>> {
let table_name = match self.table_name.as_ref() {
Some(v) => v,
None => {
return Ok(None);
}
};
let table_ref = self
.tables
.get(get_table_ref(table_name))
.with_context(|| InvalidQueryPlan {
msg: format!("Couldn't find table in table container, name:{table_name}"),
})?;
let schema = table_ref.table.schema();
let timestamp_name = schema.timestamp_name();
Some(Column::from_name(timestamp_name))
Ok(Some(Column::from_name(timestamp_name)))
}

/// This function is used to extract time range from the query plan.
Expand All @@ -125,15 +138,15 @@ impl QueryPlan {
/// Note: When it timestamp filter evals to false(such as ts < 10 and ts >
/// 100), it will return None, which means no valid time range for this
/// query.
fn extract_time_range(&self) -> Option<TimeRange> {
let ts_column = if let Some(v) = self.find_timestamp_column() {
fn extract_time_range(&self) -> Result<Option<TimeRange>> {
let ts_column = if let Some(v) = self.find_timestamp_column()? {
v
} else {
warn!(
"Couldn't find time column, plan:{:?}, table_name:{:?}",
self.df_plan, self.table_name
);
return Some(TimeRange::min_to_max());
return Ok(Some(TimeRange::min_to_max()));
};
let time_range = match influxql_query::logical_optimizer::range_predicate::find_time_range(
&self.df_plan,
Expand All @@ -145,10 +158,9 @@ impl QueryPlan {
"Couldn't find time range, plan:{:?}, err:{}",
self.df_plan, e
);
return Some(TimeRange::min_to_max());
return Ok(Some(TimeRange::min_to_max()));
}
};

debug!(
"Extract time range, value:{time_range:?}, plan:{:?}",
self.df_plan
Expand Down Expand Up @@ -190,16 +202,20 @@ impl QueryPlan {
Bound::Unbounded => {}
}

TimeRange::new(start.into(), end.into())
Ok(TimeRange::new(start.into(), end.into()))
}

/// Decide the query priority based on the query plan.
/// When query contains invalid time range, it will return None.
// TODO: Currently we only consider the time range, consider other factors, such
// as the number of series, or slow log metrics.
pub fn decide_query_priority(&self, ctx: PriorityContext) -> Option<Priority> {
pub fn decide_query_priority(&self, ctx: PriorityContext) -> Result<Option<Priority>> {
let threshold = ctx.time_range_threshold;
let time_range = self.extract_time_range()?;
let time_range = match self.extract_time_range()? {
Some(v) => v,
// When there is no valid time range , we cann't decide its priority.
None => return Ok(None),
};
let is_expensive = if let Some(v) = time_range
.exclusive_end()
.as_i64()
Expand All @@ -217,18 +233,20 @@ impl QueryPlan {
Priority::High
};

Some(priority)
Ok(Some(priority))
}

/// When query contains invalid time range such as `[200, 100]`, it will
/// return None.
pub fn query_range(&self) -> Option<i64> {
pub fn query_range(&self) -> Result<Option<i64>> {
self.extract_time_range().map(|time_range| {
time_range
.exclusive_end()
.as_i64()
.checked_sub(time_range.inclusive_start().as_i64())
.unwrap_or(i64::MAX)
time_range.map(|time_range| {
time_range
.exclusive_end()
.as_i64()
.checked_sub(time_range.inclusive_start().as_i64())
.unwrap_or(i64::MAX)
})
})
}
}
Expand Down Expand Up @@ -429,7 +447,7 @@ mod tests {
.1
.map(|v| TimeRange::new_unchecked(v.0.into(), v.1.into()));

assert_eq!(plan.extract_time_range(), expected, "sql:{}", sql);
assert_eq!(plan.extract_time_range().unwrap(), expected, "sql:{}", sql);
}
}
}

0 comments on commit 264826e

Please sign in to comment.