diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index bed6a71eee..0ed7414b9c 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -712,7 +712,7 @@ mod tests { use common_types::{ column_schema::Builder as ColumnSchemaBuilder, datum::{Datum, DatumKind}, - row::{Row, RowGroupBuilder}, + row::Row, schema::Builder as SchemaBuilder, time::Timestamp, }; @@ -738,7 +738,7 @@ mod tests { .primary_key_indexes(vec![0]) .build() .unwrap(); - let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build(); + let row_group = RowGroup::try_new(schema, rows).unwrap(); (encoded_rows, row_group) } diff --git a/analytic_engine/src/payload.rs b/analytic_engine/src/payload.rs index 032b40c5cf..1b70be5562 100644 --- a/analytic_engine/src/payload.rs +++ b/analytic_engine/src/payload.rs @@ -22,7 +22,7 @@ use codec::{ Decoder, }; use common_types::{ - row::{RowGroup, RowGroupBuilder, RowGroupBuilderFromColumn}, + row::{RowGroup, RowGroupBuilderFromColumn}, schema::Schema, table::TableId, }; @@ -202,18 +202,19 @@ impl ReadPayload { // Consume and convert rows in pb let encoded_rows = write_req_pb.rows; - let mut builder = RowGroupBuilder::with_capacity(schema.clone(), encoded_rows.len()); + let mut rows = Vec::with_capacity(encoded_rows.len()); let row_decoder = WalRowDecoder::new(&schema); for row_bytes in &encoded_rows { let row = row_decoder .decode(&mut row_bytes.as_slice()) .context(DecodeRow)?; // We skip schema check here - builder.push_checked_row(row); + rows.push(row); } - let row_group = builder.build(); - + // The `rows` are decoded according to the schema, so there is no need to do one + // more check here. + let row_group = RowGroup::new_unchecked(schema, rows); Ok(Self::Write { row_group }) } diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index a502715e32..6d8bc6c3b1 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -22,7 +22,7 @@ use std::{ use async_trait::async_trait; use common_types::{ - row::{Row, RowGroupBuilder}, + row::{Row, RowGroup}, schema::Schema, time::TimeRange, }; @@ -251,20 +251,19 @@ fn merge_pending_write_requests( assert!(!pending_writes.is_empty()); let mut last_req = pending_writes.pop().unwrap(); - let last_rows = last_req.row_group.take_rows(); - let schema = last_req.row_group.into_schema(); - let mut row_group_builder = RowGroupBuilder::with_capacity(schema, num_pending_rows); - - for mut pending_req in pending_writes { - let rows = pending_req.row_group.take_rows(); - for row in rows { - row_group_builder.push_checked_row(row) + let total_rows = { + let mut rows = Vec::with_capacity(num_pending_rows); + for mut pending_req in pending_writes { + let mut pending_rows = pending_req.row_group.take_rows(); + rows.append(&mut pending_rows); } - } - for row in last_rows { - row_group_builder.push_checked_row(row); - } - let row_group = row_group_builder.build(); + let mut last_rows = last_req.row_group.take_rows(); + rows.append(&mut last_rows); + rows + }; + + let schema = last_req.row_group.into_schema(); + let row_group = RowGroup::new_unchecked(schema, total_rows); WriteRequest { row_group } } @@ -653,7 +652,7 @@ mod tests { schema_rows.push(row); } let rows = row_util::new_rows_6(&schema_rows); - let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build(); + let row_group = RowGroup::try_new(schema, rows).unwrap(); WriteRequest { row_group } } diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 579c2be65d..6ba60424d9 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -19,7 +19,7 @@ use std::collections::{BTreeMap, HashMap}; use common_types::{ column_schema, datum::DatumKind, - row::{RowGroup, RowGroupBuilder}, + row::RowGroup, schema::{self, Schema}, time::Timestamp, }; @@ -232,9 +232,7 @@ async fn alter_schema_add_column_case( ), ]; let rows_vec = row_util::new_rows_8(&rows); - let row_group = RowGroupBuilder::with_rows(new_schema.clone(), rows_vec) - .unwrap() - .build(); + let row_group = RowGroup::try_new(new_schema.clone(), rows_vec).unwrap(); // Write data with new schema. test_ctx.write_to_table(table_name, row_group).await; @@ -288,9 +286,7 @@ async fn alter_schema_add_column_case( )), ]; let new_schema_row_group = - RowGroupBuilder::with_rows(new_schema.clone(), new_schema_rows.to_vec()) - .unwrap() - .build(); + RowGroup::try_new(new_schema.clone(), new_schema_rows.to_vec()).unwrap(); // Read data using new schema. check_read_row_group( @@ -337,9 +333,7 @@ async fn alter_schema_add_column_case( ), ]; let old_schema_rows_vec = row_util::new_rows_6(&old_schema_rows); - let old_schema_row_group = RowGroupBuilder::with_rows(old_schema.clone(), old_schema_rows_vec) - .unwrap() - .build(); + let old_schema_row_group = RowGroup::try_new(old_schema.clone(), old_schema_rows_vec).unwrap(); // Read data using old schema. check_read_row_group( diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index ec6236d92f..1b4f2f898b 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -22,7 +22,7 @@ use common_types::{ projected_schema::ProjectedSchema, record_batch::RecordBatch, request_id::RequestId, - row::{Row, RowGroup, RowGroupBuilder}, + row::{Row, RowGroup}, schema::{self, Schema}, table::DEFAULT_SHARD_ID, time::Timestamp, @@ -122,9 +122,7 @@ impl FixedSchemaTable { } fn new_row_group(&self, rows: Vec) -> RowGroup { - RowGroupBuilder::with_rows(self.create_request.params.table_schema.clone(), rows) - .unwrap() - .build() + RowGroup::try_new(self.create_request.params.table_schema.clone(), rows).unwrap() } fn new_row_opt(data: RowTupleOpt) -> Row { diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index 7d861c4bfd..790170803c 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -15,7 +15,6 @@ //! Row type use std::{ - cmp, collections::HashMap, ops::{Index, IndexMut}, }; @@ -252,14 +251,30 @@ pub struct RowGroup { schema: Schema, /// Rows in the row group rows: Vec, - // TODO(yingwen): Maybe remove min/max timestamp - /// Min timestamp of all the rows - min_timestamp: Timestamp, - /// Max timestamp of all the rows - max_timestamp: Timestamp, } impl RowGroup { + /// Create [RowGroup] without any check. + /// + /// The caller should ensure all the rows share the same schema as the + /// provided one. + #[inline] + pub fn new_unchecked(schema: Schema, rows: Vec) -> Self { + Self { schema, rows } + } + + /// Check and create row group. + /// + /// [None] will be thrown if the rows have different schema from the + /// provided one. + #[inline] + pub fn try_new(schema: Schema, rows: Vec) -> Result { + rows.iter() + .try_for_each(|row| check_row_schema(row, &schema))?; + + Ok(Self { schema, rows }) + } + /// Returns true if the row group is empty #[inline] pub fn is_empty(&self) -> bool { @@ -318,18 +333,6 @@ impl RowGroup { iter: self.rows.iter(), } } - - /// Get the min timestamp of rows - #[inline] - pub fn min_timestamp(&self) -> Timestamp { - self.min_timestamp - } - - /// Get the max timestamp of rows - #[inline] - pub fn max_timestamp(&self) -> Timestamp { - self.max_timestamp - } } impl<'a> IntoIterator for &'a RowGroup { @@ -433,8 +436,6 @@ impl RowGroupBuilderFromColumn { return RowGroup { schema: self.schema, rows: vec![], - min_timestamp: Timestamp::new(0), - max_timestamp: Timestamp::new(0), }; }; @@ -470,17 +471,9 @@ impl RowGroupBuilderFromColumn { } } - let rows = rows.into_iter().map(Row::from_datums).collect::>(); - - let (min_timestamp, max_timestamp) = self - .collect_minmax_timestamps(&rows) - .unwrap_or_else(|| (Timestamp::default(), Timestamp::default())); - RowGroup { schema: self.schema, - rows, - min_timestamp, - max_timestamp, + rows: rows.into_iter().map(Row::from_datums).collect::>(), } } @@ -488,107 +481,6 @@ impl RowGroupBuilderFromColumn { fn num_rows(&self) -> Option { self.cols.iter().next().map(|(_, v)| v.len()) } - - fn collect_minmax_timestamps(&self, rows: &[Row]) -> Option<(Timestamp, Timestamp)> { - let timestamp_idx = self.schema.timestamp_index(); - if rows.is_empty() { - return None; - } - - rows.iter() - .fold(None, |prev: Option<(Timestamp, Timestamp)>, row| { - let timestamp = row[timestamp_idx].as_timestamp()?; - match prev { - None => Some((timestamp, timestamp)), - Some((min_ts, max_ts)) => Some((min_ts.min(timestamp), max_ts.max(timestamp))), - } - }) - } -} - -/// RowGroup builder -#[derive(Debug)] -pub struct RowGroupBuilder { - schema: Schema, - rows: Vec, - min_timestamp: Option, - max_timestamp: Timestamp, -} - -impl RowGroupBuilder { - /// Create a new builder - pub fn new(schema: Schema) -> Self { - Self::with_capacity(schema, 0) - } - - /// Create a new builder with given capacity - pub fn with_capacity(schema: Schema, capacity: usize) -> Self { - Self { - schema, - rows: Vec::with_capacity(capacity), - min_timestamp: None, - max_timestamp: Timestamp::new(0), - } - } - - /// Create a new builder with schema and rows - /// - /// Return error if the `rows` do not matched the `schema` - pub fn with_rows(schema: Schema, rows: Vec) -> Result { - let mut row_group = Self::new(schema); - - // Check schema and update min/max timestamp - for row in &rows { - check_row_schema(row, &row_group.schema)?; - row_group.update_timestamps(row); - } - - row_group.rows = rows; - - Ok(row_group) - } - - /// Add a schema checked row - /// - /// REQUIRE: Caller should ensure the schema of row must equal to the schema - /// of this builder - pub fn push_checked_row(&mut self, row: Row) { - self.update_timestamps(&row); - - self.rows.push(row); - } - - /// Acquire builder to build next row of the row group - pub fn row_builder(&mut self) -> RowBuilder { - RowBuilder { - // schema: &self.schema, - cols: Vec::with_capacity(self.schema.num_columns()), - // rows: &mut self.rows, - group_builder: self, - } - } - - /// Build the row group - pub fn build(self) -> RowGroup { - RowGroup { - schema: self.schema, - rows: self.rows, - min_timestamp: self.min_timestamp.unwrap_or_else(|| Timestamp::new(0)), - max_timestamp: self.max_timestamp, - } - } - - /// Update min/max timestamp of the row group - fn update_timestamps(&mut self, row: &Row) { - // check_row_schema() ensures this datum is a timestamp, so we just unwrap here - let row_timestamp = row.timestamp(&self.schema).unwrap(); - - self.min_timestamp = match self.min_timestamp { - Some(min_timestamp) => Some(cmp::min(min_timestamp, row_timestamp)), - None => Some(row_timestamp), - }; - self.max_timestamp = cmp::max(self.max_timestamp, row_timestamp); - } } /// Check whether the datum kind matches the column schema @@ -615,16 +507,21 @@ pub fn check_datum_type(datum: &Datum, column_schema: &ColumnSchema) -> Result<( Ok(()) } -// TODO(yingwen): This builder is used to build RowGroup, need to provide a -// builder to build one row /// Row builder for the row group #[derive(Debug)] pub struct RowBuilder<'a> { - group_builder: &'a mut RowGroupBuilder, + schema: &'a Schema, cols: Vec, } impl<'a> RowBuilder<'a> { + pub fn new(schema: &'a Schema) -> RowBuilder<'a> { + Self { + schema, + cols: Vec::with_capacity(schema.num_columns()), + } + } + /// Append a datum into the row pub fn append_datum(mut self, datum: Datum) -> Result { self.check_datum(&datum)?; @@ -637,28 +534,23 @@ impl<'a> RowBuilder<'a> { /// Check whether the datum is valid fn check_datum(&self, datum: &Datum) -> Result<()> { let index = self.cols.len(); - let schema = &self.group_builder.schema; ensure!( - index < schema.num_columns(), + index < self.schema.num_columns(), ColumnOutOfBound { - len: schema.num_columns(), + len: self.schema.num_columns(), given: index, } ); - let column = schema.column(index); + let column = self.schema.column(index); check_datum_type(datum, column) } /// Finish building this row and append this row into the row group - pub fn finish(self) -> Result<()> { - ensure!( - self.cols.len() == self.group_builder.schema.num_columns(), - MissingColumns - ); + pub fn finish(self) -> Result { + ensure!(self.cols.len() == self.schema.num_columns(), MissingColumns); - self.group_builder.push_checked_row(Row { cols: self.cols }); - Ok(()) + Ok(Row { cols: self.cols }) } } diff --git a/partition_table_engine/src/partition.rs b/partition_table_engine/src/partition.rs index 21af54da20..23724b033d 100644 --- a/partition_table_engine/src/partition.rs +++ b/partition_table_engine/src/partition.rs @@ -19,7 +19,7 @@ use std::{collections::HashMap, fmt}; use analytic_engine::{table::support_pushdown, TableOptions}; use async_trait::async_trait; use common_types::{ - row::{Row, RowGroup, RowGroupBuilder}, + row::{Row, RowGroup}, schema::Schema, }; use futures::{stream::FuturesUnordered, StreamExt}; @@ -46,7 +46,7 @@ use table_engine::{ table::{ AlterOptions, AlterSchema, AlterSchemaRequest, CreatePartitionRule, FlushRequest, GetRequest, LocatePartitions, ReadRequest, Result, Scan, Table, TableId, TableStats, - UnexpectedWithMsg, UnsupportedMethod, Write, WriteBatch, WriteRequest, + UnexpectedWithMsg, UnsupportedMethod, WriteBatch, WriteRequest, }, }; @@ -139,12 +139,9 @@ impl PartitionTableImpl { let mut request_batch = Vec::with_capacity(split_rows.len()); for (partition, rows) in split_rows { let sub_table_ident = self.get_sub_table_ident(partition); - let row_group = RowGroupBuilder::with_rows(schema.clone(), rows) - .box_err() - .with_context(|| Write { - table: sub_table_ident.table.clone(), - })? - .build(); + // The rows should have the valid schema, so there is no need to do one more + // check here. + let row_group = RowGroup::new_unchecked(schema.clone(), rows); let request = RemoteWriteRequest { table: sub_table_ident, diff --git a/proxy/src/write.rs b/proxy/src/write.rs index 2f07f43d04..05a7dfeef2 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -30,7 +30,7 @@ use common_types::{ column_schema::ColumnSchema, datum::{Datum, DatumKind}, request_id::RequestId, - row::{Row, RowGroupBuilder}, + row::{Row, RowGroup}, schema::Schema, time::Timestamp, }; @@ -808,7 +808,8 @@ fn write_table_request_to_insert_plan( ) -> Result { let schema = table.schema(); - let mut rows_total = Vec::new(); + // TODO: pre-allocate the memory for the row vector. + let mut total_rows = Vec::new(); for write_entry in write_table_req.entries { let mut rows = write_entry_to_rows( &write_table_req.table, @@ -817,16 +818,15 @@ fn write_table_request_to_insert_plan( &write_table_req.field_names, write_entry, )?; - rows_total.append(&mut rows); + total_rows.append(&mut rows); } // The row group builder will checks nullable. - let row_group = RowGroupBuilder::with_rows(schema, rows_total) + let row_group = RowGroup::try_new(schema, total_rows) .box_err() .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!("Failed to build row group, table:{}", table.name()), - })? - .build(); + })?; Ok(InsertPlan { table, rows: row_group, diff --git a/query_engine/src/datafusion_impl/physical_plan.rs b/query_engine/src/datafusion_impl/physical_plan.rs index c0bf99ea00..a2d1ddf710 100644 --- a/query_engine/src/datafusion_impl/physical_plan.rs +++ b/query_engine/src/datafusion_impl/physical_plan.rs @@ -112,7 +112,7 @@ impl PhysicalPlan for DataFusionPhysicalPlanAdapter { info!( "DatafusionExecutorImpl get the executable plan, request_id:{}, physical_plan:{}", df_task_ctx.ctx.request_id, - displayable(executable.as_ref()).indent(true).to_string() + displayable(executable.as_ref()).indent(true) ); // Kept the executed plan. diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs index 9245c2ca36..b1cc364c2b 100644 --- a/query_frontend/src/planner.rs +++ b/query_frontend/src/planner.rs @@ -35,7 +35,7 @@ use common_types::{ column_schema::{self, ColumnSchema}, datum::{Datum, DatumKind}, request_id::RequestId, - row::{RowGroup, RowGroupBuilder}, + row::{RowBuilder, RowGroup}, schema::{self, Builder as SchemaBuilder, Schema, TSID_COLUMN}, }; use datafusion::{ @@ -1145,12 +1145,12 @@ fn build_row_group( match *source.body { SetExpr::Values(Values { explicit_row: _, - rows, + rows: expr_rows, }) => { - let mut row_group_builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len()); - for mut exprs in rows { + let mut rows = Vec::with_capacity(expr_rows.len()); + for mut exprs in expr_rows { // Try to build row - let mut row_builder = row_group_builder.row_builder(); + let mut row_builder = RowBuilder::new(&schema); // For each column in schema, append datum into row builder for (index_opt, column_schema) in @@ -1183,11 +1183,12 @@ fn build_row_group( } // Finish this row and append into row group - row_builder.finish().context(BuildRow)?; + let row = row_builder.finish().context(BuildRow)?; + rows.push(row); } // Build the whole row group - Ok(row_group_builder.build()) + Ok(RowGroup::new_unchecked(schema, rows)) } _ => InsertSourceBodyNotSet.fail(), } @@ -1898,12 +1899,6 @@ mod tests { ], }, ], - min_timestamp: Timestamp( - 1638428434000, - ), - max_timestamp: Timestamp( - 1638428434000, - ), }, default_value_map: {}, }, diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index dcef633ca6..993e3dc213 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -27,7 +27,7 @@ use common_types::{ projected_schema::ProjectedSchema, record_batch::RecordBatch, request_id::RequestId, - row::{Row, RowGroup, RowGroupBuilder}, + row::{Row, RowBuilder, RowGroup}, schema::{self, Schema}, table::DEFAULT_SHARD_ID, time::Timestamp, @@ -152,9 +152,12 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to build row for entry, err:{}", source))] + #[snafu(display("Failed to build row for entry, err:{source}"))] BuildRow { source: common_types::row::Error }, + #[snafu(display("Failed to build row group, err:{source}"))] + BuildRowGroup { source: common_types::row::Error }, + #[snafu(display( "Failed to decode protobuf for entry, err:{}.\nBacktrace:\n{}", source, @@ -860,9 +863,7 @@ impl CreateCatalogRequest { fn into_row_group(self, schema: Schema) -> Result { let key = self.to_key()?; let value = self.into_bytes(); - let mut builder = RowGroupBuilder::new(schema); - builder - .row_builder() + let row = RowBuilder::new(&schema) // key .append_datum(Datum::Varbinary(key)) .context(BuildRow)? @@ -875,7 +876,7 @@ impl CreateCatalogRequest { .finish() .context(BuildRow)?; - Ok(builder.build()) + RowGroup::try_new(schema, vec![row]).context(BuildRowGroup) } fn to_key(&self) -> Result { @@ -922,9 +923,7 @@ impl CreateSchemaRequest { fn into_row_group(self, schema: Schema) -> Result { let key = self.to_key()?; let value = self.into_bytes(); - let mut builder = RowGroupBuilder::new(schema); - builder - .row_builder() + let row = RowBuilder::new(&schema) // key .append_datum(Datum::Varbinary(key)) .context(BuildRow)? @@ -937,7 +936,7 @@ impl CreateSchemaRequest { .finish() .context(BuildRow)?; - Ok(builder.build()) + Ok(RowGroup::new_unchecked(schema, vec![row])) } fn to_key(&self) -> Result { @@ -1009,7 +1008,7 @@ impl TableWriter { /// Convert the table to write into [common_types::row::RowGroup]. fn convert_table_info_to_row_group(&self) -> Result { - let mut builder = RowGroupBuilder::new(self.catalog_table.schema()); + let schema = self.catalog_table.schema(); let key = Self::build_create_table_key(&self.table_to_write)?; let value = Self::build_create_table_value(self.table_to_write.clone(), self.typ)?; @@ -1018,14 +1017,14 @@ impl TableWriter { key, value ); - Self::build_row(&mut builder, key, value)?; + let row = Self::build_table_info_row(&schema, key, value)?; + let row_group = RowGroup::new_unchecked(schema, vec![row]); - Ok(builder.build()) + Ok(row_group) } - fn build_row(builder: &mut RowGroupBuilder, key: Bytes, value: Bytes) -> Result<()> { - builder - .row_builder() + fn build_table_info_row(schema: &Schema, key: Bytes, value: Bytes) -> Result { + RowBuilder::new(schema) // key .append_datum(Datum::Varbinary(key)) .context(BuildRow)? @@ -1036,8 +1035,7 @@ impl TableWriter { .append_datum(Datum::Varbinary(value)) .context(BuildRow)? .finish() - .context(BuildRow)?; - Ok(()) + .context(BuildRow) } fn build_create_table_key(table_info: &TableInfo) -> Result { diff --git a/table_engine/src/partition/rule/df_adapter/mod.rs b/table_engine/src/partition/rule/df_adapter/mod.rs index 69bee374ea..a3bab167b6 100644 --- a/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/table_engine/src/partition/rule/df_adapter/mod.rs @@ -78,7 +78,7 @@ mod tests { use common_types::{ column_schema, datum::{Datum, DatumKind}, - row::RowGroupBuilder, + row::RowBuilder, schema::{Builder, Schema, TSID_COLUMN}, string::StringBytes, time::Timestamp, @@ -199,9 +199,7 @@ mod tests { ], ]; - let mut row_group_builder = RowGroupBuilder::new(schema.clone()); - row_group_builder - .row_builder() + let row0 = RowBuilder::new(&schema) .append_datum(Datum::UInt64(0)) .unwrap() .append_datum(Datum::Timestamp(Timestamp::new(0))) @@ -214,8 +212,7 @@ mod tests { .unwrap() .finish() .unwrap(); - row_group_builder - .row_builder() + let row1 = RowBuilder::new(&schema) .append_datum(Datum::UInt64(1)) .unwrap() .append_datum(Datum::Timestamp(Timestamp::new(1))) @@ -228,7 +225,7 @@ mod tests { .unwrap() .finish() .unwrap(); - let row_group = row_group_builder.build(); + let row_group = RowGroup::new_unchecked(schema.clone(), vec![row0, row1]); // Basic flow let key_rule_adapter = diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 39cc23e50b..3b51ddfa73 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -27,7 +27,7 @@ use common_types::{ request_id::RequestId, row::{ contiguous::{ContiguousRow, ContiguousRowReader, ContiguousRowWriter}, - Row, RowGroup, RowGroupBuilder, + Row, RowGroup, }, schema::{IndexInWriterSchema, RecordSchema, Schema, Version}, }; @@ -195,8 +195,7 @@ impl WriteRequest { ) -> Result { validate_contiguous_payload_schema(schema, &payload.column_descs)?; - let mut row_group_builder = - RowGroupBuilder::with_capacity(schema.clone(), payload.encoded_rows.len()); + let mut rows = Vec::with_capacity(payload.encoded_rows.len()); for encoded_row in payload.encoded_rows { let reader = ContiguousRowReader::try_new(&encoded_row, schema) .box_err() @@ -209,16 +208,17 @@ impl WriteRequest { // from the DatumView. datums.push(datum_view.to_datum()); } - row_group_builder.push_checked_row(Row::from_datums(datums)); + rows.push(Row::from_datums(datums)); } - Ok(row_group_builder.build()) + + // The rows is decoded according to the schema, so there is no need to do a + // more check here. + Ok(RowGroup::new_unchecked(schema.clone(), rows)) } pub fn convert_into_pb(self) -> Result { let row_group = self.write_request.row_group; let table_schema = row_group.schema(); - let min_timestamp = row_group.min_timestamp().as_i64(); - let max_timestamp = row_group.max_timestamp().as_i64(); let mut encoded_rows = Vec::with_capacity(row_group.num_rows()); // TODO: The schema of the written row group may be different from the original @@ -244,8 +244,9 @@ impl WriteRequest { column_descs, }; let row_group_pb = ceresdbproto::remote_engine::RowGroup { - min_timestamp, - max_timestamp, + // Deprecated: the two timestamps are not used anymore. + min_timestamp: 0, + max_timestamp: 0, rows: Some(Contiguous(contiguous_rows)), };