Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure primary key order #1292

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ mod tests {
let schema = SchemaBuilder::new()
.add_key_column(column_schema)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();
let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build();
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ mod tests {

fn build_altered_schema(schema: &Schema) -> Schema {
let mut builder = schema::Builder::new().auto_increment_column_id(true);
let old_pk_indexes = schema.primary_key_indexes();
for column_schema in schema.key_columns() {
builder = builder
.add_key_column(column_schema.clone())
Expand All @@ -783,6 +784,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(old_pk_indexes.to_vec())
.build()
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ mod tests {
.unwrap()
.add_key_column(timestamp_column_schema)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
};
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ mod tests {
) -> WriteRequest {
let schema = FixedSchemaTable::default_schema_builder()
.version(schema_version)
.primary_key_indexes(vec![0, 1])
.build()
.unwrap();
let mut schema_rows = Vec::with_capacity(num_rows);
Expand Down
7 changes: 6 additions & 1 deletion analytic_engine/src/tests/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ async fn alter_schema_same_schema_version_case<T: WalsOpener>(

let mut schema_builder = FixedSchemaTable::default_schema_builder();
schema_builder = add_columns(schema_builder);
let new_schema = schema_builder.build().unwrap();
let new_schema = schema_builder
.primary_key_indexes(vec![0, 1])
.build()
.unwrap();

let table = test_ctx.table(table_name);
let old_schema = table.schema();
Expand Down Expand Up @@ -160,6 +163,7 @@ async fn alter_schema_old_pre_version_case<T: WalsOpener>(

let new_schema = schema_builder
.version(old_schema.version() + 1)
.primary_key_indexes(old_schema.primary_key_indexes().to_vec())
.build()
.unwrap();

Expand Down Expand Up @@ -190,6 +194,7 @@ async fn alter_schema_add_column_case<T: WalsOpener>(

let new_schema = schema_builder
.version(old_schema.version() + 1)
.primary_key_indexes(old_schema.primary_key_indexes().to_vec())
.build()
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/tests/drop_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,10 @@ fn test_alter_schema_drop_create<T: EngineBuildContext>(engine_context: T) {
.unwrap(),
)
.unwrap();

let new_schema = schema_builder
.version(old_schema.version() + 1)
.primary_key_indexes(old_schema.primary_key_indexes().to_vec())
.build()
.unwrap();
let request = AlterSchemaRequest {
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ pub fn create_schema_builder(
assert!(!key_tuples.is_empty());

let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() + normal_tuples.len())
.auto_increment_column_id(true);
.auto_increment_column_id(true)
.primary_key_indexes((0..key_tuples.len()).collect());

for tuple in key_tuples {
// Key column is not nullable.
Expand Down
126 changes: 50 additions & 76 deletions common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Column id is missing in schema, id:{}.\nBacktrace:\n{}",
id,
backtrace
))]
ColumnIdMissing { id: ColumnId, backtrace: Backtrace },

#[snafu(display("Primary key indexes cannot be empty.\nBacktrace:\n{}", backtrace))]
EmptyPirmaryKeyIndexes { backtrace: Backtrace },

#[snafu(display(
"Unsupported key column type, name:{}, type:{:?}.\nBacktrace:\n{}",
name,
Expand Down Expand Up @@ -114,13 +124,6 @@ pub enum Error {
#[snafu(display("Timestamp not in primary key.\nBacktrace:\n{}", backtrace))]
TimestampNotInPrimaryKey { backtrace: Backtrace },

#[snafu(display(
"Key column cannot be nullable, name:{}.\nBacktrace:\n{}",
name,
backtrace
))]
NullKeyColumn { name: String, backtrace: Backtrace },

#[snafu(display(
"Invalid arrow field, field_name:{}, arrow_schema:{:?}, err:{}",
field_name,
Expand Down Expand Up @@ -958,62 +961,37 @@ impl Schema {
impl TryFrom<schema_pb::TableSchema> for Schema {
type Error = Error;

// We can't use Builder directly here, since it will disorder columns.
fn try_from(schema: schema_pb::TableSchema) -> Result<Self> {
let mut builder = Builder::with_capacity(schema.columns.len()).version(schema.version);
let primary_key_ids = schema.primary_key_ids;
let column_schemas = schema
.columns
.into_iter()
.map(|column_schema_pb| {
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)

let primary_key_indexes = primary_key_ids
.iter()
.cloned()
.map(|id| {
let col_idx = schema
.columns
.iter()
.enumerate()
.find_map(|(idx, col)| if col.id == id { Some(idx) } else { None })
.context(ColumnIdMissing { id })?;

Ok(col_idx)
})
.collect::<Result<Vec<_>>>()?;
builder = builder.primary_key_indexes(primary_key_indexes);

let mut primary_key_indexes = Vec::with_capacity(primary_key_ids.len());
let mut timestamp_index = None;
for pk_id in &primary_key_ids {
for (idx, col) in column_schemas.iter().enumerate() {
if col.id == *pk_id {
primary_key_indexes.push(idx);
if DatumKind::Timestamp == col.data_type {
// TODO: add a timestamp_id in schema_pb, so we can have two timestamp
// columns in primary keys.
if let Some(idx) = timestamp_index {
let column_schema: &ColumnSchema = &column_schemas[idx];
return TimestampKeyExists {
timestamp_column: column_schema.name.to_string(),
given_column: col.name.clone(),
}
.fail();
}

timestamp_index = Some(idx);
}
break;
}
for column_schema_pb in schema.columns.into_iter() {
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
let column =
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?;
if primary_key_ids.contains(&column.id) {
builder = builder.add_key_column(column)?;
} else {
builder = builder.add_normal_column(column)?;
}
}

let timestamp_index = timestamp_index.context(TimestampNotInPrimaryKey)?;
let tsid_index = Builder::find_tsid_index(&column_schemas);
let fields = column_schemas
.iter()
.map(|c| c.to_arrow_field())
.collect::<Vec<_>>();
let meta = Builder::build_arrow_schema_meta(
primary_key_indexes.clone(),
timestamp_index,
schema.version,
);

Ok(Schema {
arrow_schema: Arc::new(ArrowSchema::new_with_metadata(fields, meta)),
primary_key_indexes,
column_schemas: Arc::new(ColumnSchemas::new(column_schemas)),
version: schema.version,
tsid_index,
timestamp_index,
})
builder.build()
}
}

Expand Down Expand Up @@ -1090,8 +1068,6 @@ impl Builder {
self.may_alloc_column_id(&mut column);
self.validate_column(&column, true)?;

ensure!(!column.is_nullable, NullKeyColumn { name: column.name });

// FIXME(xikai): it seems not reasonable to decide the timestamp column in this
// way.
let is_timestamp = DatumKind::Timestamp == column.data_type;
Expand All @@ -1106,7 +1082,6 @@ impl Builder {
self.timestamp_index = Some(self.columns.len());
}

self.primary_key_indexes.push(self.columns.len());
self.insert_new_column(column);

Ok(self)
Expand All @@ -1122,6 +1097,12 @@ impl Builder {
Ok(self)
}

/// Set primary key indexes of the schema
pub fn primary_key_indexes(mut self, indexes: Vec<usize>) -> Self {
self.primary_key_indexes = indexes;
self
}

/// Set version of the schema
pub fn version(mut self, version: Version) -> Self {
self.version = version;
Expand Down Expand Up @@ -1264,7 +1245,10 @@ impl Builder {
let timestamp_index = self.timestamp_index.context(TimestampNotInPrimaryKey)?;

// Timestamp key column is exists, so key columns should not be zero
assert!(!self.primary_key_indexes.is_empty());
ensure!(
!self.primary_key_indexes.is_empty(),
EmptyPirmaryKeyIndexes {}
);

let tsid_index = Self::find_tsid_index(&self.columns);
let fields = self
Expand Down Expand Up @@ -1373,6 +1357,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
}
Expand Down Expand Up @@ -1466,6 +1451,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![1, 2])
.build()
.unwrap();

Expand Down Expand Up @@ -1566,6 +1552,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();
}
Expand All @@ -1586,23 +1573,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap();
assert!(builder.build().is_err());
}

// Currently we allow null key column, maybe we can rename it to sorted column.
// Since we primary key in ceresdb isn't same with MySQL, and it only served for
// sort.
#[test]
fn test_null_key() {
assert!(Builder::new()
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.id(1)
.is_nullable(true)
.build()
.expect("should succeed build column schema")
)
.is_err());
assert!(builder.primary_key_indexes(vec![0]).build().is_err());
}

#[test]
Expand Down Expand Up @@ -1637,6 +1608,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();

Expand Down Expand Up @@ -1690,6 +1662,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap();

Expand Down Expand Up @@ -1796,6 +1769,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.expect("should succeed to build schema");

Expand Down
4 changes: 3 additions & 1 deletion common_types/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ fn base_schema_builder() -> schema::Builder {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
}

fn default_value_schema_builder() -> schema::Builder {
schema::Builder::new()
.auto_increment_column_id(true)
.primary_key_indexes(vec![0, 1])
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.build()
Expand Down Expand Up @@ -233,7 +235,7 @@ pub fn build_schema_for_cpu() -> Schema {
)
.unwrap();

builder.build().unwrap()
builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}

#[allow(clippy::too_many_arguments)]
Expand Down
1 change: 1 addition & 0 deletions interpreters/src/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ fn build_new_schema(current_schema: &Schema, column_schemas: Vec<ColumnSchema>)

let mut builder =
schema::Builder::with_capacity(current_schema.num_columns() + column_schemas.len())
.primary_key_indexes(current_schema.primary_key_indexes().to_vec())
// Increment the schema version.
.version(current_version + 1);
for (idx, column) in current_schema.columns().iter().enumerate() {
Expand Down
1 change: 1 addition & 0 deletions proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ mod tests {
.unwrap(),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ mod tests {
.unwrap(),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions proxy/src/influxdb/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ mod test {
.unwrap(),
)
.unwrap()
.primary_key_indexes(vec![0, 1, 2])
.build()
.unwrap()
}
Expand Down
Loading
Loading