Skip to content

Commit

Permalink
fix: wrong primary key when define tsid and timestamp key as primary …
Browse files Browse the repository at this point in the history
…key (#453)

* fix: wrong primary when define tsid and tiemstamp key as primary key

* chore: add test for define primary with tsid

* chore: fix harness test

* refactor: add some comments

* fix: wrong order of the primary key columns

* chore: replace BTreeMap with HashMap for columns_by_names
  • Loading branch information
ShiKaiWi authored Dec 7, 2022
1 parent f1d97e7 commit be10d88
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 180 deletions.
282 changes: 178 additions & 104 deletions sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use hashbrown::HashMap as NoStdHashMap;
use log::debug;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use sqlparser::ast::{
ColumnDef, ColumnOption, Expr, ObjectName, Query, SetExpr, SqlOption,
ColumnDef, ColumnOption, Expr, Ident, ObjectName, Query, SetExpr, SqlOption,
Statement as SqlStatement, TableConstraint, UnaryOperator, Value, Values,
};
use table_engine::table::TableRef;
Expand Down Expand Up @@ -100,14 +100,33 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Undefined column is used in primary key, column name:{}.\nBacktrace:\n{}",
name,
backtrace
))]
UndefinedColumnInPrimaryKey { name: String, backtrace: Backtrace },

#[snafu(display("Primary key not found, column name:{}", name))]
PrimaryKeyNotFound { name: String },

#[snafu(display(
"Duplicate definitions of primary key are found, first:{:?}, second:{:?}.\nBacktrace:\n{:?}",
first,
second,
backtrace,
))]
DuplicatePrimaryKey {
first: Vec<Ident>,
second: Vec<Ident>,
backtrace: Backtrace,
},

#[snafu(display("Tag column not found, name:{}", name))]
TagColumnNotFound { name: String },

#[snafu(display(
"Timestamp key column can not be tag, name:{}.\nBactrace:\n{:?}",
"Timestamp key column can not be tag, name:{}.\nBacktrace:\n{:?}",
name,
backtrace
))]
Expand Down Expand Up @@ -214,6 +233,8 @@ pub enum Error {

define_result!(Error);

const DEFAULT_QUOTE_CHAR: char = '`';

/// Planner produces logical plans from SQL AST
// TODO(yingwen): Rewrite Planner instead of using datafusion's planner
pub struct Planner<'a, P: MetaProvider> {
Expand Down Expand Up @@ -322,143 +343,196 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
})
}

fn create_table_to_plan(&self, stmt: CreateTable) -> Result<Plan> {
ensure!(!stmt.table_name.is_empty(), CreateTableNameEmpty);

debug!("Create table to plan, stmt:{:?}", stmt);

// TODO(yingwen): Maybe support create table on other schema?
let table_name = stmt.table_name.to_string();
let table_ref = TableReference::from(table_name.as_str());

// Now we only takes the table name and ignore the schema and catalog name
let table = table_ref.table().to_string();

let mut schema_builder =
schema::Builder::with_capacity(stmt.columns.len()).auto_increment_column_id(true);

// Build all column schemas.
let mut name_column_map = stmt
.columns
.iter()
.map(|col| Ok((col.name.value.as_str(), parse_column(col)?)))
.collect::<Result<BTreeMap<_, _>>>()?;

let name_column_index_map = stmt
.columns
.iter()
.enumerate()
.map(|(idx, col)| (col.name.value.as_str(), idx))
.collect::<BTreeMap<_, _>>();
fn create_table_schema(
columns: &[Ident],
primary_key_columns: &[Ident],
mut columns_by_name: HashMap<&str, ColumnSchema>,
column_idxs_by_name: HashMap<&str, usize>,
enable_tsid_primary_key: bool,
) -> Result<Schema> {
assert_eq!(columns_by_name.len(), column_idxs_by_name.len());

let mut schema_builder = schema::Builder::with_capacity(columns_by_name.len())
.auto_increment_column_id(true)
.enable_tsid_primary_key(enable_tsid_primary_key);

// Collect the key columns.
for key_col in primary_key_columns {
let col_name = key_col.value.as_str();
let col = columns_by_name
.remove(col_name)
.context(UndefinedColumnInPrimaryKey { name: col_name })?;
schema_builder = schema_builder
.add_key_column(col)
.context(BuildTableSchema)?;
}

// Tsid column is a reserved column.
ensure!(
!name_column_map.contains_key(TSID_COLUMN),
ColumnNameReserved {
name: TSID_COLUMN.to_string(),
// Collect the normal columns.
for normal_col in columns {
let col_name = normal_col.value.as_str();
// Only normal columns are kept in the `columns_by_name`.
if let Some(col) = columns_by_name.remove(col_name) {
schema_builder = schema_builder
.add_normal_column(col)
.context(BuildTableSchema)?;
}
);

// Find timestamp key and primary key contraint

let mut timestamp_column_idx = None;
let mut timestamp_name = None;
}

let mut primary_key_column_idxs = vec![];
schema_builder.build().context(BuildTableSchema)
}

for constraint in stmt.constraints.iter() {
// Find the primary key columns and ensure at most only one exists.
fn find_and_ensure_primary_key_columns(
constraints: &[TableConstraint],
) -> Result<Option<Vec<Ident>>> {
let mut primary_key_columns: Option<Vec<Ident>> = None;
for constraint in constraints {
if let TableConstraint::Unique {
columns,
is_primary,
..
} = constraint
{
if *is_primary {
// Build primary key, the builder will check timestamp column is in primary key.
for column in columns {
if let Some(idx) = name_column_index_map.get(&*column.value) {
primary_key_column_idxs.push(*idx);
ensure!(
primary_key_columns.is_none(),
DuplicatePrimaryKey {
first: primary_key_columns.unwrap(),
second: columns.clone()
}
}
} else if parser::is_timestamp_key_constraint(constraint) {
);
primary_key_columns = Some(columns.clone());
}
}
}

Ok(primary_key_columns)
}

// Find the timestamp column and ensure its valid existence (only one).
fn find_and_ensure_timestamp_column(
columns_by_name: &HashMap<&str, ColumnSchema>,
constraints: &[TableConstraint],
) -> Result<Ident> {
let mut timestamp_column_name = None;
for constraint in constraints {
if let TableConstraint::Unique { columns, .. } = constraint {
if parser::is_timestamp_key_constraint(constraint) {
// Only one timestamp key constraint
ensure!(timestamp_column_idx.is_none(), InvalidTimestampKey);
ensure!(timestamp_column_name.is_none(), InvalidTimestampKey);
// Only one column in constraint
ensure!(columns.len() == 1, InvalidTimestampKey);
let timestamp_ident = columns[0].clone();

let name = &columns[0].value;
let timestamp_column = name_column_map
.get(name as &str)
.context(TimestampColumnNotFound { name })?;
// Ensure type is timestamp
let timestamp_column = columns_by_name
.get(timestamp_ident.value.as_str())
.context(TimestampColumnNotFound {
name: &timestamp_ident.value,
})?;

// Ensure the timestamp key's type is timestamp.
ensure!(
timestamp_column.data_type == DatumKind::Timestamp,
InvalidTimestampKey
);
let column_idx = name_column_index_map
.get(name as &str)
.context(TimestampColumnNotFound { name })?;
// Ensure the timestamp key is not a tag.
ensure!(
!timestamp_column.is_tag,
TimestampKeyTag {
name: &timestamp_ident.value,
}
);

timestamp_column_idx = Some(*column_idx);
timestamp_name = Some(name.clone());
timestamp_column_name = Some(timestamp_ident);
}
}
}

// Timestamp column must be provided.
let timestamp_name = timestamp_name.context(RequireTimestamp)?;
// The timestamp key column must not be a Tag column
if let Some(timestamp_column) = name_column_map.get(&timestamp_name.as_str()) {
ensure!(
!timestamp_column.is_tag,
TimestampKeyTag {
name: &timestamp_name,
}
)
}
timestamp_column_name.context(RequireTimestamp)
}

let timestamp_col_idx = timestamp_column_idx.context(RequireTimestamp)?;
// The key columns have been consumed.
for (idx, col) in stmt.columns.iter().enumerate() {
let col_name = col.name.value.as_str();
if let Some(col) = name_column_map.remove(col_name) {
if !primary_key_column_idxs.is_empty() {
if primary_key_column_idxs.contains(&idx) {
let key_column = if TSID_COLUMN == col.name {
schema_builder = schema_builder.enable_tsid_primary_key(true);
Self::tsid_column_schema()?
} else {
col
};
schema_builder = schema_builder
.add_key_column(key_column)
.context(BuildTableSchema)?;
continue;
}
} else if timestamp_col_idx == idx {
// If primary key is not set, Use (timestamp, tsid) as primary key.
schema_builder = schema_builder
.enable_tsid_primary_key(true)
.add_key_column(col)
.context(BuildTableSchema)?
.add_key_column(Self::tsid_column_schema()?)
.context(BuildTableSchema)?;
continue;
}
fn create_table_to_plan(&self, stmt: CreateTable) -> Result<Plan> {
ensure!(!stmt.table_name.is_empty(), CreateTableNameEmpty);

schema_builder = schema_builder
.add_normal_column(col)
.context(BuildTableSchema)?;
debug!("Create table to plan, stmt:{:?}", stmt);

// Build all column schemas.
let mut columns_by_name = stmt
.columns
.iter()
.map(|col| Ok((col.name.value.as_str(), parse_column(col)?)))
.collect::<Result<HashMap<_, _>>>()?;

let mut column_idxs_by_name: HashMap<_, _> = stmt
.columns
.iter()
.enumerate()
.map(|(idx, col)| (col.name.value.as_str(), idx))
.collect();

// Tsid column is a reserved column.
ensure!(
!columns_by_name.contains_key(TSID_COLUMN),
ColumnNameReserved {
name: TSID_COLUMN.to_string(),
}
}
);

let table_schema = schema_builder.build().context(BuildTableSchema)?;
let timestamp_column =
Self::find_and_ensure_timestamp_column(&columns_by_name, &stmt.constraints)?;
let tsid_column = Ident::with_quote(DEFAULT_QUOTE_CHAR, TSID_COLUMN);
let mut columns: Vec<_> = stmt.columns.iter().map(|col| col.name.clone()).collect();
let mut enable_tsid_primary_key = false;

let mut add_tsid_column = || {
columns_by_name.insert(TSID_COLUMN, Self::tsid_column_schema()?);
column_idxs_by_name.insert(TSID_COLUMN, columns.len());
columns.push(tsid_column.clone());
enable_tsid_primary_key = true;
Ok(())
};
let primary_key_columns =
match Self::find_and_ensure_primary_key_columns(&stmt.constraints)? {
Some(primary_key_columns) => {
// Ensure the primary key is defined already.
for col in &primary_key_columns {
let col_name = &col.value;
if col_name == TSID_COLUMN {
// tsid column is a reserved column which can't be
// defined by user, so let's add it manually.
add_tsid_column()?;
}
}

primary_key_columns
}
None => {
// No primary key is provided explicitly, so let's use `(tsid,
// timestamp_key)` as the default primary key.
add_tsid_column()?;

vec![tsid_column, timestamp_column]
}
};
let table_schema = Self::create_table_schema(
&columns,
&primary_key_columns,
columns_by_name,
column_idxs_by_name,
enable_tsid_primary_key,
)?;

let options = parse_options(stmt.options)?;

// ensure default value options are valid
ensure_column_default_value_valid(table_schema.columns(), &self.meta_provider)?;

// TODO(yingwen): Maybe support create table on other schema?
let table_name = stmt.table_name.to_string();
let table_ref = TableReference::from(table_name.as_str());
// Now we only takes the table name and ignore the schema and catalog name
let table = table_ref.table().to_string();

let plan = CreateTablePlan {
engine: stmt.engine,
if_not_exists: stmt.if_not_exists,
Expand Down
Loading

0 comments on commit be10d88

Please sign in to comment.