Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wrong primary key when define tsid and timestamp key as primary key #453

Merged
merged 6 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 178 additions & 104 deletions sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use hashbrown::HashMap as NoStdHashMap;
use log::debug;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use sqlparser::ast::{
ColumnDef, ColumnOption, Expr, ObjectName, Query, SetExpr, SqlOption,
ColumnDef, ColumnOption, Expr, Ident, ObjectName, Query, SetExpr, SqlOption,
Statement as SqlStatement, TableConstraint, UnaryOperator, Value, Values,
};
use table_engine::table::TableRef;
Expand Down Expand Up @@ -100,14 +100,33 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Undefined column is used in primary key, column name:{}.\nBacktrace:\n{}",
name,
backtrace
))]
UndefinedColumnInPrimaryKey { name: String, backtrace: Backtrace },

#[snafu(display("Primary key not found, column name:{}", name))]
PrimaryKeyNotFound { name: String },

#[snafu(display(
"Duplicate definitions of primary key are found, first:{:?}, second:{:?}.\nBacktrace:\n{:?}",
first,
second,
backtrace,
))]
DuplicatePrimaryKey {
first: Vec<Ident>,
second: Vec<Ident>,
backtrace: Backtrace,
},

#[snafu(display("Tag column not found, name:{}", name))]
TagColumnNotFound { name: String },

#[snafu(display(
"Timestamp key column can not be tag, name:{}.\nBactrace:\n{:?}",
"Timestamp key column can not be tag, name:{}.\nBacktrace:\n{:?}",
name,
backtrace
))]
Expand Down Expand Up @@ -214,6 +233,8 @@ pub enum Error {

define_result!(Error);

const DEFAULT_QUOTE_CHAR: char = '`';

/// Planner produces logical plans from SQL AST
// TODO(yingwen): Rewrite Planner instead of using datafusion's planner
pub struct Planner<'a, P: MetaProvider> {
Expand Down Expand Up @@ -322,143 +343,196 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
})
}

fn create_table_to_plan(&self, stmt: CreateTable) -> Result<Plan> {
ensure!(!stmt.table_name.is_empty(), CreateTableNameEmpty);

debug!("Create table to plan, stmt:{:?}", stmt);

// TODO(yingwen): Maybe support create table on other schema?
let table_name = stmt.table_name.to_string();
let table_ref = TableReference::from(table_name.as_str());

// Now we only takes the table name and ignore the schema and catalog name
let table = table_ref.table().to_string();

let mut schema_builder =
schema::Builder::with_capacity(stmt.columns.len()).auto_increment_column_id(true);

// Build all column schemas.
let mut name_column_map = stmt
.columns
.iter()
.map(|col| Ok((col.name.value.as_str(), parse_column(col)?)))
.collect::<Result<BTreeMap<_, _>>>()?;

let name_column_index_map = stmt
.columns
.iter()
.enumerate()
.map(|(idx, col)| (col.name.value.as_str(), idx))
.collect::<BTreeMap<_, _>>();
fn create_table_schema(
columns: &[Ident],
primary_key_columns: &[Ident],
mut columns_by_name: HashMap<&str, ColumnSchema>,
column_idxs_by_name: HashMap<&str, usize>,
enable_tsid_primary_key: bool,
) -> Result<Schema> {
assert_eq!(columns_by_name.len(), column_idxs_by_name.len());

let mut schema_builder = schema::Builder::with_capacity(columns_by_name.len())
.auto_increment_column_id(true)
.enable_tsid_primary_key(enable_tsid_primary_key);

// Collect the key columns.
for key_col in primary_key_columns {
let col_name = key_col.value.as_str();
let col = columns_by_name
.remove(col_name)
.context(UndefinedColumnInPrimaryKey { name: col_name })?;
schema_builder = schema_builder
.add_key_column(col)
.context(BuildTableSchema)?;
}

// Tsid column is a reserved column.
ensure!(
!name_column_map.contains_key(TSID_COLUMN),
ColumnNameReserved {
name: TSID_COLUMN.to_string(),
// Collect the normal columns.
for normal_col in columns {
let col_name = normal_col.value.as_str();
// Only normal columns are kept in the `columns_by_name`.
if let Some(col) = columns_by_name.remove(col_name) {
schema_builder = schema_builder
.add_normal_column(col)
.context(BuildTableSchema)?;
}
);

// Find timestamp key and primary key contraint

let mut timestamp_column_idx = None;
let mut timestamp_name = None;
}

let mut primary_key_column_idxs = vec![];
schema_builder.build().context(BuildTableSchema)
}

for constraint in stmt.constraints.iter() {
// Find the primary key columns and ensure at most only one exists.
fn find_and_ensure_primary_key_columns(
constraints: &[TableConstraint],
) -> Result<Option<Vec<Ident>>> {
let mut primary_key_columns: Option<Vec<Ident>> = None;
for constraint in constraints {
if let TableConstraint::Unique {
columns,
is_primary,
..
} = constraint
{
if *is_primary {
// Build primary key, the builder will check timestamp column is in primary key.
for column in columns {
if let Some(idx) = name_column_index_map.get(&*column.value) {
primary_key_column_idxs.push(*idx);
ensure!(
primary_key_columns.is_none(),
DuplicatePrimaryKey {
first: primary_key_columns.unwrap(),
second: columns.clone()
}
}
} else if parser::is_timestamp_key_constraint(constraint) {
);
primary_key_columns = Some(columns.clone());
}
}
}

Ok(primary_key_columns)
}

// Find the timestamp column and ensure its valid existence (only one).
fn find_and_ensure_timestamp_column(
columns_by_name: &HashMap<&str, ColumnSchema>,
constraints: &[TableConstraint],
) -> Result<Ident> {
let mut timestamp_column_name = None;
for constraint in constraints {
if let TableConstraint::Unique { columns, .. } = constraint {
if parser::is_timestamp_key_constraint(constraint) {
// Only one timestamp key constraint
ensure!(timestamp_column_idx.is_none(), InvalidTimestampKey);
ensure!(timestamp_column_name.is_none(), InvalidTimestampKey);
// Only one column in constraint
ensure!(columns.len() == 1, InvalidTimestampKey);
let timestamp_ident = columns[0].clone();

let name = &columns[0].value;
let timestamp_column = name_column_map
.get(name as &str)
.context(TimestampColumnNotFound { name })?;
// Ensure type is timestamp
let timestamp_column = columns_by_name
.get(timestamp_ident.value.as_str())
.context(TimestampColumnNotFound {
name: &timestamp_ident.value,
})?;

// Ensure the timestamp key's type is timestamp.
ensure!(
timestamp_column.data_type == DatumKind::Timestamp,
InvalidTimestampKey
);
let column_idx = name_column_index_map
.get(name as &str)
.context(TimestampColumnNotFound { name })?;
// Ensure the timestamp key is not a tag.
ensure!(
!timestamp_column.is_tag,
TimestampKeyTag {
name: &timestamp_ident.value,
}
);

timestamp_column_idx = Some(*column_idx);
timestamp_name = Some(name.clone());
timestamp_column_name = Some(timestamp_ident);
}
}
}

// Timestamp column must be provided.
let timestamp_name = timestamp_name.context(RequireTimestamp)?;
// The timestamp key column must not be a Tag column
if let Some(timestamp_column) = name_column_map.get(&timestamp_name.as_str()) {
ensure!(
!timestamp_column.is_tag,
TimestampKeyTag {
name: &timestamp_name,
}
)
}
timestamp_column_name.context(RequireTimestamp)
}

let timestamp_col_idx = timestamp_column_idx.context(RequireTimestamp)?;
// The key columns have been consumed.
for (idx, col) in stmt.columns.iter().enumerate() {
let col_name = col.name.value.as_str();
if let Some(col) = name_column_map.remove(col_name) {
if !primary_key_column_idxs.is_empty() {
if primary_key_column_idxs.contains(&idx) {
let key_column = if TSID_COLUMN == col.name {
schema_builder = schema_builder.enable_tsid_primary_key(true);
Self::tsid_column_schema()?
} else {
col
};
schema_builder = schema_builder
.add_key_column(key_column)
.context(BuildTableSchema)?;
continue;
}
} else if timestamp_col_idx == idx {
// If primary key is not set, Use (timestamp, tsid) as primary key.
schema_builder = schema_builder
.enable_tsid_primary_key(true)
.add_key_column(col)
.context(BuildTableSchema)?
.add_key_column(Self::tsid_column_schema()?)
.context(BuildTableSchema)?;
continue;
}
fn create_table_to_plan(&self, stmt: CreateTable) -> Result<Plan> {
ensure!(!stmt.table_name.is_empty(), CreateTableNameEmpty);

schema_builder = schema_builder
.add_normal_column(col)
.context(BuildTableSchema)?;
debug!("Create table to plan, stmt:{:?}", stmt);

// Build all column schemas.
let mut columns_by_name = stmt
.columns
.iter()
.map(|col| Ok((col.name.value.as_str(), parse_column(col)?)))
.collect::<Result<HashMap<_, _>>>()?;

let mut column_idxs_by_name: HashMap<_, _> = stmt
.columns
.iter()
.enumerate()
.map(|(idx, col)| (col.name.value.as_str(), idx))
.collect();

// Tsid column is a reserved column.
ensure!(
!columns_by_name.contains_key(TSID_COLUMN),
ColumnNameReserved {
name: TSID_COLUMN.to_string(),
}
}
);

let table_schema = schema_builder.build().context(BuildTableSchema)?;
let timestamp_column =
Self::find_and_ensure_timestamp_column(&columns_by_name, &stmt.constraints)?;
let tsid_column = Ident::with_quote(DEFAULT_QUOTE_CHAR, TSID_COLUMN);
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
let mut columns: Vec<_> = stmt.columns.iter().map(|col| col.name.clone()).collect();
let mut enable_tsid_primary_key = false;

let mut add_tsid_column = || {
columns_by_name.insert(TSID_COLUMN, Self::tsid_column_schema()?);
column_idxs_by_name.insert(TSID_COLUMN, columns.len());
columns.push(tsid_column.clone());
enable_tsid_primary_key = true;
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
};
let primary_key_columns =
match Self::find_and_ensure_primary_key_columns(&stmt.constraints)? {
Some(primary_key_columns) => {
// Ensure the primary key is defined already.
for col in &primary_key_columns {
let col_name = &col.value;
if col_name == TSID_COLUMN {
// tsid column is a reserved column which can't be
// defined by user, so let's add it manually.
add_tsid_column()?;
}
}

primary_key_columns
}
None => {
// No primary key is provided explicitly, so let's use `(tsid,
// timestamp_key)` as the default primary key.
add_tsid_column()?;

vec![tsid_column, timestamp_column]
}
};
let table_schema = Self::create_table_schema(
&columns,
&primary_key_columns,
columns_by_name,
column_idxs_by_name,
enable_tsid_primary_key,
)?;

let options = parse_options(stmt.options)?;

// ensure default value options are valid
ensure_column_default_value_valid(table_schema.columns(), &self.meta_provider)?;

// TODO(yingwen): Maybe support create table on other schema?
let table_name = stmt.table_name.to_string();
let table_ref = TableReference::from(table_name.as_str());
// Now we only takes the table name and ignore the schema and catalog name
let table = table_ref.table().to_string();

let plan = CreateTablePlan {
engine: stmt.engine,
if_not_exists: stmt.if_not_exists,
Expand Down
Loading