Skip to content

Commit

Permalink
feat: replace column index with id when persisting schema in manifest (
Browse files Browse the repository at this point in the history
…apache#652)

* change column index to id in persist table schema.

* rename filterd_row_group to target_row_group.
  • Loading branch information
Rachelint authored Feb 16, 2023
1 parent 536cd0c commit 39281d5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 36 deletions.
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,19 @@ impl<'a> Reader<'a> {
"Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}",
suggest_read_parallelism, read_parallelism, chunk_size
);
let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
let mut target_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() {
let chunk_idx = row_group_idx % chunks_num;
filtered_row_group_chunks[chunk_idx].push(row_group);
target_row_group_chunks[chunk_idx].push(row_group);
}

let proj_mask = ProjectionMask::leaves(
meta_data.parquet().file_metadata().schema_descr(),
row_projector.existed_source_projection().iter().copied(),
);

let mut streams = Vec::with_capacity(filtered_row_group_chunks.len());
for chunk in filtered_row_group_chunks {
let mut streams = Vec::with_capacity(target_row_group_chunks.len());
for chunk in target_row_group_chunks {
let object_store_reader =
ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
Expand Down
27 changes: 14 additions & 13 deletions common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,12 +950,12 @@ impl TryFrom<schema_pb::TableSchema> for Schema {

fn try_from(schema: schema_pb::TableSchema) -> Result<Self> {
let mut builder = Builder::with_capacity(schema.columns.len()).version(schema.version);
let primary_key_indexes = schema.primary_key_indexes;
let primary_key_ids = schema.primary_key_ids;

for (i, column_schema_pb) in schema.columns.into_iter().enumerate() {
for column_schema_pb in schema.columns.into_iter() {
let column =
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?;
if primary_key_indexes.contains(&(i as u64)) {
if primary_key_ids.contains(&column.id) {
builder = builder.add_key_column(column)?;
} else {
builder = builder.add_normal_column(column)?;
Expand All @@ -974,18 +974,19 @@ impl From<&Schema> for schema_pb::TableSchema {
.map(|v| schema_pb::ColumnSchema::from(v.clone()))
.collect();

let table_schema = schema_pb::TableSchema {
timestamp_index: schema.timestamp_index as u32,
let timestamp_id = schema.column(schema.timestamp_index()).id;
let primary_key_ids = schema
.primary_key_indexes()
.iter()
.map(|i| schema.column(*i).id)
.collect();

schema_pb::TableSchema {
timestamp_id,
version: schema.version,
columns,
primary_key_indexes: schema
.primary_key_indexes
.iter()
.map(|i| *i as u64)
.collect(),
};

table_schema
primary_key_ids,
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ pub fn prune_row_groups(
exprs: &[Expr],
row_groups: &[RowGroupMetaData],
) -> Vec<usize> {
let mut filtered_row_groups = Vec::with_capacity(row_groups.len());
let mut target_row_groups = Vec::with_capacity(row_groups.len());
let should_reads = filter_row_groups_inner(schema, exprs, row_groups);
for (i, should_read) in should_reads.iter().enumerate() {
if *should_read {
filtered_row_groups.push(i);
target_row_groups.push(i);
}
}

filtered_row_groups
target_row_groups
}

/// Determine whether a row group should be read according to the meta data
Expand Down
2 changes: 1 addition & 1 deletion components/parquet_ext/src/reverse_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl<'a> Builder<'a> {
self
}

pub fn filtered_row_groups(mut self, row_groups: Option<Vec<usize>>) -> Self {
pub fn target_row_groups(mut self, row_groups: Option<Vec<usize>>) -> Self {
self.row_groups = row_groups;

self
Expand Down

0 comments on commit 39281d5

Please sign in to comment.