Skip to content

Commit

Permalink
fix: ensure primary key order (apache#1292)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 authored Nov 6, 2023
1 parent 4860cd4 commit 2cb70f7
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 80 deletions.
1 change: 1 addition & 0 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ mod tests {
let schema = SchemaBuilder::new()
.add_key_column(column_schema)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();
let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build();
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ mod tests {

fn build_altered_schema(schema: &Schema) -> Schema {
let mut builder = schema::Builder::new().auto_increment_column_id(true);
let old_pk_indexes = schema.primary_key_indexes();
for column_schema in schema.key_columns() {
builder = builder
.add_key_column(column_schema.clone())
Expand All @@ -783,6 +784,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(old_pk_indexes.to_vec())
.build()
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ mod tests {
.unwrap()
.add_key_column(timestamp_column_schema)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
};
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ mod tests {
) -> WriteRequest {
let schema = FixedSchemaTable::default_schema_builder()
.version(schema_version)
.primary_key_indexes(vec![0, 1])
.build()
.unwrap();
let mut schema_rows = Vec::with_capacity(num_rows);
Expand Down
7 changes: 6 additions & 1 deletion analytic_engine/src/tests/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ async fn alter_schema_same_schema_version_case<T: WalsOpener>(

let mut schema_builder = FixedSchemaTable::default_schema_builder();
schema_builder = add_columns(schema_builder);
let new_schema = schema_builder.build().unwrap();
let new_schema = schema_builder
.primary_key_indexes(vec![0, 1])
.build()
.unwrap();

let table = test_ctx.table(table_name);
let old_schema = table.schema();
Expand Down Expand Up @@ -160,6 +163,7 @@ async fn alter_schema_old_pre_version_case<T: WalsOpener>(

let new_schema = schema_builder
.version(old_schema.version() + 1)
.primary_key_indexes(old_schema.primary_key_indexes().to_vec())
.build()
.unwrap();

Expand Down Expand Up @@ -190,6 +194,7 @@ async fn alter_schema_add_column_case<T: WalsOpener>(

let new_schema = schema_builder
.version(old_schema.version() + 1)
.primary_key_indexes(old_schema.primary_key_indexes().to_vec())
.build()
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/tests/drop_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,10 @@ fn test_alter_schema_drop_create<T: EngineBuildContext>(engine_context: T) {
.unwrap(),
)
.unwrap();

let new_schema = schema_builder
.version(old_schema.version() + 1)
.primary_key_indexes(old_schema.primary_key_indexes().to_vec())
.build()
.unwrap();
let request = AlterSchemaRequest {
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ pub fn create_schema_builder(
assert!(!key_tuples.is_empty());

let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() + normal_tuples.len())
.auto_increment_column_id(true);
.auto_increment_column_id(true)
.primary_key_indexes((0..key_tuples.len()).collect());

for tuple in key_tuples {
// Key column is not nullable.
Expand Down
126 changes: 50 additions & 76 deletions common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Column id is missing in schema, id:{}.\nBacktrace:\n{}",
id,
backtrace
))]
ColumnIdMissing { id: ColumnId, backtrace: Backtrace },

#[snafu(display("Primary key indexes cannot be empty.\nBacktrace:\n{}", backtrace))]
EmptyPirmaryKeyIndexes { backtrace: Backtrace },

#[snafu(display(
"Unsupported key column type, name:{}, type:{:?}.\nBacktrace:\n{}",
name,
Expand Down Expand Up @@ -114,13 +124,6 @@ pub enum Error {
#[snafu(display("Timestamp not in primary key.\nBacktrace:\n{}", backtrace))]
TimestampNotInPrimaryKey { backtrace: Backtrace },

#[snafu(display(
"Key column cannot be nullable, name:{}.\nBacktrace:\n{}",
name,
backtrace
))]
NullKeyColumn { name: String, backtrace: Backtrace },

#[snafu(display(
"Invalid arrow field, field_name:{}, arrow_schema:{:?}, err:{}",
field_name,
Expand Down Expand Up @@ -958,62 +961,37 @@ impl Schema {
impl TryFrom<schema_pb::TableSchema> for Schema {
type Error = Error;

// We can't use Builder directly here, since it will disorder columns.
fn try_from(schema: schema_pb::TableSchema) -> Result<Self> {
let mut builder = Builder::with_capacity(schema.columns.len()).version(schema.version);
let primary_key_ids = schema.primary_key_ids;
let column_schemas = schema
.columns
.into_iter()
.map(|column_schema_pb| {
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)

let primary_key_indexes = primary_key_ids
.iter()
.cloned()
.map(|id| {
let col_idx = schema
.columns
.iter()
.enumerate()
.find_map(|(idx, col)| if col.id == id { Some(idx) } else { None })
.context(ColumnIdMissing { id })?;

Ok(col_idx)
})
.collect::<Result<Vec<_>>>()?;
builder = builder.primary_key_indexes(primary_key_indexes);

let mut primary_key_indexes = Vec::with_capacity(primary_key_ids.len());
let mut timestamp_index = None;
for pk_id in &primary_key_ids {
for (idx, col) in column_schemas.iter().enumerate() {
if col.id == *pk_id {
primary_key_indexes.push(idx);
if DatumKind::Timestamp == col.data_type {
// TODO: add a timestamp_id in schema_pb, so we can have two timestamp
// columns in primary keys.
if let Some(idx) = timestamp_index {
let column_schema: &ColumnSchema = &column_schemas[idx];
return TimestampKeyExists {
timestamp_column: column_schema.name.to_string(),
given_column: col.name.clone(),
}
.fail();
}

timestamp_index = Some(idx);
}
break;
}
for column_schema_pb in schema.columns {
let column =
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?;
if primary_key_ids.contains(&column.id) {
builder = builder.add_key_column(column)?;
} else {
builder = builder.add_normal_column(column)?;
}
}

let timestamp_index = timestamp_index.context(TimestampNotInPrimaryKey)?;
let tsid_index = Builder::find_tsid_index(&column_schemas);
let fields = column_schemas
.iter()
.map(|c| c.to_arrow_field())
.collect::<Vec<_>>();
let meta = Builder::build_arrow_schema_meta(
primary_key_indexes.clone(),
timestamp_index,
schema.version,
);

Ok(Schema {
arrow_schema: Arc::new(ArrowSchema::new_with_metadata(fields, meta)),
primary_key_indexes,
column_schemas: Arc::new(ColumnSchemas::new(column_schemas)),
version: schema.version,
tsid_index,
timestamp_index,
})
builder.build()
}
}

Expand Down Expand Up @@ -1090,8 +1068,6 @@ impl Builder {
self.may_alloc_column_id(&mut column);
self.validate_column(&column, true)?;

ensure!(!column.is_nullable, NullKeyColumn { name: column.name });

// FIXME(xikai): it seems not reasonable to decide the timestamp column in this
// way.
let is_timestamp = DatumKind::Timestamp == column.data_type;
Expand All @@ -1106,7 +1082,6 @@ impl Builder {
self.timestamp_index = Some(self.columns.len());
}

self.primary_key_indexes.push(self.columns.len());
self.insert_new_column(column);

Ok(self)
Expand All @@ -1122,6 +1097,12 @@ impl Builder {
Ok(self)
}

/// Set primary key indexes of the schema
pub fn primary_key_indexes(mut self, indexes: Vec<usize>) -> Self {
self.primary_key_indexes = indexes;
self
}

/// Set version of the schema
pub fn version(mut self, version: Version) -> Self {
self.version = version;
Expand Down Expand Up @@ -1264,7 +1245,10 @@ impl Builder {
let timestamp_index = self.timestamp_index.context(TimestampNotInPrimaryKey)?;

// Timestamp key column is exists, so key columns should not be zero
assert!(!self.primary_key_indexes.is_empty());
ensure!(
!self.primary_key_indexes.is_empty(),
EmptyPirmaryKeyIndexes {}
);

let tsid_index = Self::find_tsid_index(&self.columns);
let fields = self
Expand Down Expand Up @@ -1373,6 +1357,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
}
Expand Down Expand Up @@ -1466,6 +1451,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![1, 2])
.build()
.unwrap();

Expand Down Expand Up @@ -1566,6 +1552,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();
}
Expand All @@ -1586,23 +1573,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap();
assert!(builder.build().is_err());
}

// Currently we allow null key column, maybe we can rename it to sorted column.
// Since we primary key in ceresdb isn't same with MySQL, and it only served for
// sort.
#[test]
fn test_null_key() {
assert!(Builder::new()
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.id(1)
.is_nullable(true)
.build()
.expect("should succeed build column schema")
)
.is_err());
assert!(builder.primary_key_indexes(vec![0]).build().is_err());
}

#[test]
Expand Down Expand Up @@ -1637,6 +1608,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();

Expand Down Expand Up @@ -1690,6 +1662,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap();

Expand Down Expand Up @@ -1796,6 +1769,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.expect("should succeed to build schema");

Expand Down
4 changes: 3 additions & 1 deletion common_types/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ fn base_schema_builder() -> schema::Builder {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
}

fn default_value_schema_builder() -> schema::Builder {
schema::Builder::new()
.auto_increment_column_id(true)
.primary_key_indexes(vec![0, 1])
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.build()
Expand Down Expand Up @@ -233,7 +235,7 @@ pub fn build_schema_for_cpu() -> Schema {
)
.unwrap();

builder.build().unwrap()
builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}

#[allow(clippy::too_many_arguments)]
Expand Down
1 change: 1 addition & 0 deletions interpreters/src/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ fn build_new_schema(current_schema: &Schema, column_schemas: Vec<ColumnSchema>)

let mut builder =
schema::Builder::with_capacity(current_schema.num_columns() + column_schemas.len())
.primary_key_indexes(current_schema.primary_key_indexes().to_vec())
// Increment the schema version.
.version(current_version + 1);
for (idx, column) in current_schema.columns().iter().enumerate() {
Expand Down
1 change: 1 addition & 0 deletions proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ mod tests {
.unwrap(),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ mod tests {
.unwrap(),
)
.unwrap()
.primary_key_indexes(vec![0, 1])
.build()
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions proxy/src/influxdb/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ mod tests {
.expect("should succeed build column schema"),
)
.unwrap()
.primary_key_indexes(vec![0])
.build()
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ mod test {
.unwrap(),
)
.unwrap()
.primary_key_indexes(vec![0, 1, 2])
.build()
.unwrap()
}
Expand Down
Loading

0 comments on commit 2cb70f7

Please sign in to comment.