Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: table name is normalized when find timestamp column #1446

Merged
merged 5 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions integration_tests/cases/env/local/ddl/query-plan.result
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,36 @@ plan_type,plan,
String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=0\n num_ssts=1\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_sst_1, fetched_columns:[t,name]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=408\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"),


CREATE TABLE `TEST_QUERY_PRIORITY` (
NAME string TAG,
VALUE double NOT NULL,
TS timestamp NOT NULL,
timestamp KEY (TS)) ENGINE = Analytic WITH (
enable_ttl = 'false',
segment_duration = '2h',
update_mode = 'append'
);

affected_rows: 0

-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000 and TS < 1695348002000;

plan_type,plan,
String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=High, metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None), TS < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=false\n=0]\n"),


-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000;

plan_type,plan,
String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n=0]\n"),


DROP TABLE `03_dml_select_real_time_range`;

affected_rows: 0
Expand All @@ -126,3 +156,7 @@ DROP TABLE `03_append_mode_table`;

affected_rows: 0

DROP TABLE `TEST_QUERY_PRIORITY`;

affected_rows: 0

21 changes: 21 additions & 0 deletions integration_tests/cases/env/local/ddl/query-plan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,26 @@ where t >= 1695348001000 and name = 'ceresdb';
explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';

CREATE TABLE `TEST_QUERY_PRIORITY` (
NAME string TAG,
VALUE double NOT NULL,
TS timestamp NOT NULL,
timestamp KEY (TS)) ENGINE = Analytic WITH (
enable_ttl = 'false',
segment_duration = '2h',
update_mode = 'append'
);

-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000 and TS < 1695348002000;

-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000;

DROP TABLE `03_dml_select_real_time_range`;
DROP TABLE `03_append_mode_table`;
DROP TABLE `TEST_QUERY_PRIORITY`;
28 changes: 12 additions & 16 deletions integration_tests/cases/env/local/system/system_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@ CREATE TABLE `01_system_table1` (

affected_rows: 0

-- FIXME
SELECT
`timestamp`,
`catalog`,
`schema`,
`table_name`,
`engine`
FROM
system.public.tables
WHERE
table_name = '01_system_table1';

timestamp,catalog,schema,table_name,engine,
Timestamp(0),String("horaedb"),String("public"),String("01_system_table1"),String("Analytic"),


-- TODO: when query table in system catalog, it will throw errors now
-- Couldn't find table in table container
-- SELECT
-- `timestamp`,
-- `catalog`,
-- `schema`,
-- `table_name`,
-- `engine`
-- FROM
-- system.public.tables
-- WHERE
-- table_name = '01_system_table1';
-- FIXME
SHOW TABLES LIKE '01%';

Expand Down
23 changes: 12 additions & 11 deletions integration_tests/cases/env/local/system/system_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ CREATE TABLE `01_system_table1` (
timestamp KEY (timestamp)) ENGINE=Analytic;


-- FIXME
SELECT
`timestamp`,
`catalog`,
`schema`,
`table_name`,
`engine`
FROM
system.public.tables
WHERE
table_name = '01_system_table1';
-- TODO: when query table in system catalog, it will throw errors now
-- Couldn't find table in table container
-- SELECT
-- `timestamp`,
-- `catalog`,
-- `schema`,
-- `table_name`,
-- `engine`
-- FROM
-- system.public.tables
-- WHERE
-- table_name = '01_system_table1';


-- FIXME
Expand Down
13 changes: 10 additions & 3 deletions src/interpreters/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,16 @@ impl Interpreter for SelectInterpreter {
async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
let request_id = self.ctx.request_id();
let plan = self.plan;
let priority = match plan.decide_query_priority(PriorityContext {
time_range_threshold: self.ctx.expensive_query_threshold(),
}) {
let priority = match plan
.decide_query_priority(PriorityContext {
time_range_threshold: self.ctx.expensive_query_threshold(),
})
.box_err()
.with_context(|| ExecutePlan {
msg: format!("decide query priority failed, id:{request_id}"),
})
.context(Select)?
{
Some(v) => v,
None => {
debug!(
Expand Down
10 changes: 9 additions & 1 deletion src/proxy/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::{collections::HashSet, str::FromStr, sync::RwLock};

use datafusion::logical_expr::logical_plan::LogicalPlan;
use logger::error;
use macros::define_result;
use query_frontend::plan::Plan;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -74,7 +75,14 @@ impl BlockRule {
BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
BlockRule::QueryRange(threshold) => {
if let Plan::Query(plan) = plan {
if let Some(range) = plan.query_range() {
let range = match plan.query_range() {
Ok(v) => v,
Err(e) => {
error!("Find query range failed, err:{e}");
return false;
}
};
if let Some(range) = range {
if range > *threshold {
return true;
}
Expand Down
12 changes: 9 additions & 3 deletions src/proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,15 @@ impl Proxy {
}

if let Plan::Query(plan) = &plan {
if let Some(priority) = plan.decide_query_priority(PriorityContext {
time_range_threshold: self.expensive_query_threshold,
}) {
if let Some(priority) = plan
.decide_query_priority(PriorityContext {
time_range_threshold: self.expensive_query_threshold,
})
.box_err()
.context(Internal {
msg: format!("Decide query priority failed, table_name:{table_name:?}"),
})?
{
slow_timer.priority(priority);
}
}
Expand Down
62 changes: 40 additions & 22 deletions src/query_frontend/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use datafusion::{
use logger::{debug, warn};
use macros::define_result;
use runtime::Priority;
use snafu::Snafu;
use snafu::{OptionExt, Snafu};
use table_engine::{partition::PartitionInfo, table::TableRef};

use crate::{ast::ShowCreateObject, container::TableContainer};
use crate::{ast::ShowCreateObject, container::TableContainer, planner::get_table_ref};

#[derive(Debug, Snafu)]
pub enum Error {
Expand All @@ -54,6 +54,9 @@ pub enum Error {

#[snafu(display("Alter primary key is not allowed."))]
AlterPrimaryKey,

#[snafu(display("Query plan is invalid, msg:{msg}."))]
InvalidQueryPlan { msg: String },
}

define_result!(Error);
Expand Down Expand Up @@ -109,12 +112,22 @@ pub struct QueryPlan {
}

impl QueryPlan {
fn find_timestamp_column(&self) -> Option<Column> {
let table_name = self.table_name.as_ref()?;
let table_ref = self.tables.get(table_name.into())?;
fn find_timestamp_column(&self) -> Result<Option<Column>> {
let table_name = match self.table_name.as_ref() {
Some(v) => v,
None => {
return Ok(None);
}
};
let table_ref = self
.tables
.get(get_table_ref(table_name))
.with_context(|| InvalidQueryPlan {
msg: format!("Couldn't find table in table container, name:{table_name}"),
})?;
let schema = table_ref.table.schema();
let timestamp_name = schema.timestamp_name();
Some(Column::from_name(timestamp_name))
Ok(Some(Column::from_name(timestamp_name)))
}

/// This function is used to extract time range from the query plan.
Expand All @@ -125,15 +138,15 @@ impl QueryPlan {
/// Note: When it timestamp filter evals to false(such as ts < 10 and ts >
/// 100), it will return None, which means no valid time range for this
/// query.
fn extract_time_range(&self) -> Option<TimeRange> {
let ts_column = if let Some(v) = self.find_timestamp_column() {
fn extract_time_range(&self) -> Result<Option<TimeRange>> {
let ts_column = if let Some(v) = self.find_timestamp_column()? {
v
} else {
warn!(
"Couldn't find time column, plan:{:?}, table_name:{:?}",
self.df_plan, self.table_name
);
return Some(TimeRange::min_to_max());
return Ok(Some(TimeRange::min_to_max()));
ZuLiangWang marked this conversation as resolved.
Show resolved Hide resolved
};
let time_range = match influxql_query::logical_optimizer::range_predicate::find_time_range(
&self.df_plan,
Expand All @@ -145,10 +158,9 @@ impl QueryPlan {
"Couldn't find time range, plan:{:?}, err:{}",
self.df_plan, e
);
return Some(TimeRange::min_to_max());
return Ok(Some(TimeRange::min_to_max()));
}
};

debug!(
"Extract time range, value:{time_range:?}, plan:{:?}",
self.df_plan
Expand Down Expand Up @@ -190,16 +202,20 @@ impl QueryPlan {
Bound::Unbounded => {}
}

TimeRange::new(start.into(), end.into())
Ok(TimeRange::new(start.into(), end.into()))
}

/// Decide the query priority based on the query plan.
/// When query contains invalid time range, it will return None.
// TODO: Currently we only consider the time range, consider other factors, such
// as the number of series, or slow log metrics.
pub fn decide_query_priority(&self, ctx: PriorityContext) -> Option<Priority> {
pub fn decide_query_priority(&self, ctx: PriorityContext) -> Result<Option<Priority>> {
let threshold = ctx.time_range_threshold;
let time_range = self.extract_time_range()?;
let time_range = match self.extract_time_range()? {
Some(v) => v,
// When there is no valid time range , we cann't decide its priority.
None => return Ok(None),
};
let is_expensive = if let Some(v) = time_range
.exclusive_end()
.as_i64()
Expand All @@ -217,18 +233,20 @@ impl QueryPlan {
Priority::High
};

Some(priority)
Ok(Some(priority))
}

/// When query contains invalid time range such as `[200, 100]`, it will
/// return None.
pub fn query_range(&self) -> Option<i64> {
pub fn query_range(&self) -> Result<Option<i64>> {
self.extract_time_range().map(|time_range| {
time_range
.exclusive_end()
.as_i64()
.checked_sub(time_range.inclusive_start().as_i64())
.unwrap_or(i64::MAX)
time_range.map(|time_range| {
time_range
.exclusive_end()
.as_i64()
.checked_sub(time_range.inclusive_start().as_i64())
.unwrap_or(i64::MAX)
})
})
}
}
Expand Down Expand Up @@ -429,7 +447,7 @@ mod tests {
.1
.map(|v| TimeRange::new_unchecked(v.0.into(), v.1.into()));

assert_eq!(plan.extract_time_range(), expected, "sql:{}", sql);
assert_eq!(plan.extract_time_range().unwrap(), expected, "sql:{}", sql);
}
}
}
Loading