diff --git a/Cargo.lock b/Cargo.lock index 1205376ee1..0fb7ab5745 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,7 +90,7 @@ dependencies = [ "async-trait", "base64 0.13.0", "bytes 1.2.1", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "common_types", "common_util", "datafusion", @@ -1111,7 +1111,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=43939799b2e65e3fc5795118fc77593f7c4b19d7#43939799b2e65e3fc5795118fc77593f7c4b19d7" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1#2d99b441d8dd947caa51019a9d2ff3873aaca6d1" dependencies = [ "prost", "protoc-bin-vendored", @@ -1122,7 +1122,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b#81a6d9ead104b2910f5c4484135054d51095090b" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=43939799b2e65e3fc5795118fc77593f7c4b19d7#43939799b2e65e3fc5795118fc77593f7c4b19d7" dependencies = [ "prost", "protoc-bin-vendored", @@ -1261,7 +1261,7 @@ name = "cluster" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "common_types", "common_util", "log", @@ -1312,7 +1312,7 @@ dependencies = [ "arrow_ext", "byteorder", "bytes_ext", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "chrono", "datafusion", "murmur3", @@ -1331,7 +1331,7 @@ dependencies = [ "arrow 32.0.0", "avro-rs", "backtrace", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "chrono", "common_types", "crossbeam-utils 0.8.11", @@ -3556,7 +3556,7 @@ name = "meta_client" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "common_types", "common_util", "futures 0.3.25", @@ -4051,7 +4051,7 @@ version = "1.0.0-alpha02" dependencies = [ "async-trait", "bytes 1.2.1", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "chrono", "clru", "common_util", @@ -5213,7 +5213,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "clru", "common_types", "common_util", @@ -5336,7 +5336,7 @@ name = "router" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "cluster", "common_types", "common_util", @@ -5675,7 +5675,7 @@ dependencies = [ "async-trait", "bytes 1.2.1", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "cluster", "common_types", "common_util", @@ -6003,7 +6003,7 @@ dependencies = [ "arrow 32.0.0", "async-trait", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "common_types", "common_util", "datafusion", @@ -6265,7 +6265,7 @@ dependencies = [ "arrow 32.0.0", "async-trait", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "common_types", "common_util", "futures 0.3.25", @@ -6282,7 +6282,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow 32.0.0", "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "common_types", "common_util", "datafusion", @@ -7204,7 +7204,7 @@ name = "wal" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=81a6d9ead104b2910f5c4484135054d51095090b)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", "chrono", "common_types", "common_util", diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 3e22842161..b601bdbcac 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -195,10 +195,10 @@ impl<'a> Reader<'a> { "Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}", suggest_read_parallelism, read_parallelism, chunk_size ); - let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; + let mut target_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() { let chunk_idx = row_group_idx % chunks_num; - filtered_row_group_chunks[chunk_idx].push(row_group); + target_row_group_chunks[chunk_idx].push(row_group); } let proj_mask = ProjectionMask::leaves( @@ -206,8 +206,8 @@ impl<'a> Reader<'a> { row_projector.existed_source_projection().iter().copied(), ); - let mut streams = Vec::with_capacity(filtered_row_group_chunks.len()); - for chunk in filtered_row_group_chunks { + let mut streams = Vec::with_capacity(target_row_group_chunks.len()); + for chunk in target_row_group_chunks { let object_store_reader = ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 3dcbdb4d85..8eb9571719 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -950,12 +950,12 @@ impl TryFrom for Schema { fn try_from(schema: schema_pb::TableSchema) -> Result { let mut builder = Builder::with_capacity(schema.columns.len()).version(schema.version); - let primary_key_indexes = schema.primary_key_indexes; + let primary_key_ids = schema.primary_key_ids; - for (i, column_schema_pb) in schema.columns.into_iter().enumerate() { + for column_schema_pb in schema.columns.into_iter() { let column = ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?; - if primary_key_indexes.contains(&(i as u64)) { + if primary_key_ids.contains(&column.id) { builder = builder.add_key_column(column)?; } else { builder = builder.add_normal_column(column)?; @@ -974,18 +974,19 @@ impl From<&Schema> for schema_pb::TableSchema { .map(|v| schema_pb::ColumnSchema::from(v.clone())) .collect(); - let table_schema = schema_pb::TableSchema { - timestamp_index: schema.timestamp_index as u32, + let timestamp_id = schema.column(schema.timestamp_index()).id; + let primary_key_ids = schema + .primary_key_indexes() + .iter() + .map(|i| schema.column(*i).id) + .collect(); + + schema_pb::TableSchema { + timestamp_id, version: schema.version, columns, - primary_key_indexes: schema - .primary_key_indexes - .iter() - .map(|i| *i as u64) - .collect(), - }; - - table_schema + primary_key_ids, + } } } diff --git a/components/parquet_ext/src/prune/min_max.rs b/components/parquet_ext/src/prune/min_max.rs index 61c4cbd749..df7838080a 100644 --- a/components/parquet_ext/src/prune/min_max.rs +++ b/components/parquet_ext/src/prune/min_max.rs @@ -18,15 +18,15 @@ pub fn prune_row_groups( exprs: &[Expr], row_groups: &[RowGroupMetaData], ) -> Vec { - let mut filtered_row_groups = Vec::with_capacity(row_groups.len()); + let mut target_row_groups = Vec::with_capacity(row_groups.len()); let should_reads = filter_row_groups_inner(schema, exprs, row_groups); for (i, should_read) in should_reads.iter().enumerate() { if *should_read { - filtered_row_groups.push(i); + target_row_groups.push(i); } } - filtered_row_groups + target_row_groups } /// Determine whether a row group should be read according to the meta data diff --git a/components/parquet_ext/src/reverse_reader.rs b/components/parquet_ext/src/reverse_reader.rs index b43ba6a686..079cb75dce 100644 --- a/components/parquet_ext/src/reverse_reader.rs +++ b/components/parquet_ext/src/reverse_reader.rs @@ -108,7 +108,7 @@ impl<'a> Builder<'a> { self } - pub fn filtered_row_groups(mut self, row_groups: Option>) -> Self { + pub fn target_row_groups(mut self, row_groups: Option>) -> Self { self.row_groups = row_groups; self