Skip to content

Commit

Permalink
refactor: remove unused min/max timestamp in the RowGroup (apache#1297)
Browse files Browse the repository at this point in the history
## Rationale
The min/max timestamp in `RowGroup` is not used any more.

## Detailed Changes
- Remove the min/max timestamp from the `RowGroup`
- Remove the `RowGroupBuilder` because it's too simple without min/max
timestamp updating

## Test Plan
All the tests in the CI should pass.
  • Loading branch information
ShiKaiWi authored Nov 7, 2023
1 parent 2cb70f7 commit 617b166
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 242 deletions.
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ mod tests {
use common_types::{
column_schema::Builder as ColumnSchemaBuilder,
datum::{Datum, DatumKind},
row::{Row, RowGroupBuilder},
row::Row,
schema::Builder as SchemaBuilder,
time::Timestamp,
};
Expand All @@ -738,7 +738,7 @@ mod tests {
.primary_key_indexes(vec![0])
.build()
.unwrap();
let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build();
let row_group = RowGroup::try_new(schema, rows).unwrap();

(encoded_rows, row_group)
}
Expand Down
11 changes: 6 additions & 5 deletions analytic_engine/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use codec::{
Decoder,
};
use common_types::{
row::{RowGroup, RowGroupBuilder, RowGroupBuilderFromColumn},
row::{RowGroup, RowGroupBuilderFromColumn},
schema::Schema,
table::TableId,
};
Expand Down Expand Up @@ -202,18 +202,19 @@ impl ReadPayload {

// Consume and convert rows in pb
let encoded_rows = write_req_pb.rows;
let mut builder = RowGroupBuilder::with_capacity(schema.clone(), encoded_rows.len());
let mut rows = Vec::with_capacity(encoded_rows.len());
let row_decoder = WalRowDecoder::new(&schema);
for row_bytes in &encoded_rows {
let row = row_decoder
.decode(&mut row_bytes.as_slice())
.context(DecodeRow)?;
// We skip schema check here
builder.push_checked_row(row);
rows.push(row);
}

let row_group = builder.build();

// The `rows` are decoded according to the schema, so there is no need to do one
// more check here.
let row_group = RowGroup::new_unchecked(schema, rows);
Ok(Self::Write { row_group })
}

Expand Down
29 changes: 14 additions & 15 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{

use async_trait::async_trait;
use common_types::{
row::{Row, RowGroupBuilder},
row::{Row, RowGroup},
schema::Schema,
time::TimeRange,
};
Expand Down Expand Up @@ -251,20 +251,19 @@ fn merge_pending_write_requests(
assert!(!pending_writes.is_empty());

let mut last_req = pending_writes.pop().unwrap();
let last_rows = last_req.row_group.take_rows();
let schema = last_req.row_group.into_schema();
let mut row_group_builder = RowGroupBuilder::with_capacity(schema, num_pending_rows);

for mut pending_req in pending_writes {
let rows = pending_req.row_group.take_rows();
for row in rows {
row_group_builder.push_checked_row(row)
let total_rows = {
let mut rows = Vec::with_capacity(num_pending_rows);
for mut pending_req in pending_writes {
let mut pending_rows = pending_req.row_group.take_rows();
rows.append(&mut pending_rows);
}
}
for row in last_rows {
row_group_builder.push_checked_row(row);
}
let row_group = row_group_builder.build();
let mut last_rows = last_req.row_group.take_rows();
rows.append(&mut last_rows);
rows
};

let schema = last_req.row_group.into_schema();
let row_group = RowGroup::new_unchecked(schema, total_rows);
WriteRequest { row_group }
}

Expand Down Expand Up @@ -653,7 +652,7 @@ mod tests {
schema_rows.push(row);
}
let rows = row_util::new_rows_6(&schema_rows);
let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build();
let row_group = RowGroup::try_new(schema, rows).unwrap();
WriteRequest { row_group }
}

Expand Down
14 changes: 4 additions & 10 deletions analytic_engine/src/tests/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{BTreeMap, HashMap};
use common_types::{
column_schema,
datum::DatumKind,
row::{RowGroup, RowGroupBuilder},
row::RowGroup,
schema::{self, Schema},
time::Timestamp,
};
Expand Down Expand Up @@ -232,9 +232,7 @@ async fn alter_schema_add_column_case<T: WalsOpener>(
),
];
let rows_vec = row_util::new_rows_8(&rows);
let row_group = RowGroupBuilder::with_rows(new_schema.clone(), rows_vec)
.unwrap()
.build();
let row_group = RowGroup::try_new(new_schema.clone(), rows_vec).unwrap();

// Write data with new schema.
test_ctx.write_to_table(table_name, row_group).await;
Expand Down Expand Up @@ -288,9 +286,7 @@ async fn alter_schema_add_column_case<T: WalsOpener>(
)),
];
let new_schema_row_group =
RowGroupBuilder::with_rows(new_schema.clone(), new_schema_rows.to_vec())
.unwrap()
.build();
RowGroup::try_new(new_schema.clone(), new_schema_rows.to_vec()).unwrap();

// Read data using new schema.
check_read_row_group(
Expand Down Expand Up @@ -337,9 +333,7 @@ async fn alter_schema_add_column_case<T: WalsOpener>(
),
];
let old_schema_rows_vec = row_util::new_rows_6(&old_schema_rows);
let old_schema_row_group = RowGroupBuilder::with_rows(old_schema.clone(), old_schema_rows_vec)
.unwrap()
.build();
let old_schema_row_group = RowGroup::try_new(old_schema.clone(), old_schema_rows_vec).unwrap();

// Read data using old schema.
check_read_row_group(
Expand Down
6 changes: 2 additions & 4 deletions analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_types::{
projected_schema::ProjectedSchema,
record_batch::RecordBatch,
request_id::RequestId,
row::{Row, RowGroup, RowGroupBuilder},
row::{Row, RowGroup},
schema::{self, Schema},
table::DEFAULT_SHARD_ID,
time::Timestamp,
Expand Down Expand Up @@ -122,9 +122,7 @@ impl FixedSchemaTable {
}

fn new_row_group(&self, rows: Vec<Row>) -> RowGroup {
RowGroupBuilder::with_rows(self.create_request.params.table_schema.clone(), rows)
.unwrap()
.build()
RowGroup::try_new(self.create_request.params.table_schema.clone(), rows).unwrap()
}

fn new_row_opt(data: RowTupleOpt) -> Row {
Expand Down
Loading

0 comments on commit 617b166

Please sign in to comment.