Skip to content

Commit

Permalink
chore: remove the codes about the reverse reading (#1116)
Browse files Browse the repository at this point in the history
## Rationale
Currently, the read in a reverse order of primary key is not supported
yet. Remove the codes about that.

## Detailed Changes
Remove the codes about the reverse read during plan stage and reading
sst stage, while the ability to scan memtable in reverse order is kept
for possible future use.

## Test Plan
Existing tests.
  • Loading branch information
ShiKaiWi authored Aug 1, 2023
1 parent 5cc2541 commit ee10f2f
Show file tree
Hide file tree
Showing 32 changed files with 61 additions and 1,196 deletions.
40 changes: 20 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ impl SizeTieredPicker {
return None;
}

// Find the hotest bucket
// Find the hottest bucket
if let Some((bucket, hotness)) =
pruned_bucket_and_hotness
.into_iter()
Expand All @@ -419,12 +419,12 @@ impl SizeTieredPicker {
if !c.is_eq() {
return c;
}
//TODO(boyan), compacting smallest sstables first?
// TODO(boyan), compacting smallest sstables first?
b1.avg_size.cmp(&b2.avg_size)
})
{
debug!(
"Find the hotest bucket, hotness: {}, bucket: {:?}",
"Find the hottest bucket, hotness: {}, bucket: {:?}",
hotness, bucket
);
Some(bucket.files)
Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ impl SpaceStore {
let table_options = table_data.table_options();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_read_options = SstReadOptions {
reverse: false,
num_rows_per_row_group: table_options.num_rows_per_row_group,
frequency: ReadFrequency::Once,
projected_schema: projected_schema.clone(),
Expand Down
22 changes: 3 additions & 19 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ const ITER_NUM_METRIC_NAME: &str = "iter_num";
const MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "merge_iter";
const CHAIN_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "chain_iter";

/// Check whether it needs to apply merge sorting when reading the table with
/// the `table_options` by the `read_request`.
fn need_merge_sort_streams(table_options: &TableOptions, read_request: &ReadRequest) -> bool {
table_options.need_dedup() || read_request.order.is_in_order()
}

impl Instance {
/// Read data in multiple time range from table, and return
/// `read_parallelism` output streams.
Expand All @@ -95,7 +89,7 @@ impl Instance {
let table_options = table_data.table_options();
// Collect metrics.
table_data.metrics.on_read_request_begin();
let need_merge_sort = need_merge_sort_streams(&table_options, &request);
let need_merge_sort = table_options.need_dedup();
request.metrics_collector.collect(Metric::boolean(
MERGE_SORT_METRIC_NAME.to_string(),
need_merge_sort,
Expand All @@ -118,15 +112,10 @@ impl Instance {
fn build_partitioned_streams(
&self,
request: &ReadRequest,
mut partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
) -> Result<PartitionedStreams> {
let read_parallelism = request.opts.read_parallelism;

if read_parallelism == 1 && request.order.is_in_desc_order() {
// TODO(xikai): it seems this can be avoided.
partitioned_iters.reverse();
};

// Split iterators into `read_parallelism` groups.
let mut splitted_iters: Vec<_> = std::iter::repeat_with(Vec::new)
.take(read_parallelism)
Expand Down Expand Up @@ -157,7 +146,6 @@ impl Instance {
let sequence = table_data.last_sequence();
let projected_schema = request.projected_schema.clone();
let sst_read_options = SstReadOptions {
reverse: request.order.is_in_desc_order(),
frequency: ReadFrequency::Frequent,
projected_schema: projected_schema.clone(),
predicate: request.predicate.clone(),
Expand Down Expand Up @@ -191,7 +179,7 @@ impl Instance {
store_picker: self.space_store.store_picker(),
merge_iter_options: iter_options.clone(),
need_dedup: table_options.need_dedup(),
reverse: request.order.is_in_desc_order(),
reverse: false,
};

let merge_iter = MergeBuilder::new(merge_config)
Expand Down Expand Up @@ -226,11 +214,7 @@ impl Instance {
) -> Result<Vec<ChainIterator>> {
let projected_schema = request.projected_schema.clone();

assert!(request.order.is_out_of_order());

let sst_read_options = SstReadOptions {
// no need to read in order so just read in asc order by default.
reverse: false,
frequency: ReadFrequency::Frequent,
projected_schema: projected_schema.clone(),
predicate: request.predicate.clone(),
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub struct MergeConfig<'a> {
pub merge_iter_options: IterOptions,

pub need_dedup: bool,
// TODO: Currently, the read the sst in a reverse order is not supported yet, that is to say,
// the output won't be expected if it is set.
pub reverse: bool,
}

Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ impl Default for ScanOptions {

#[derive(Debug, Clone)]
pub struct SstReadOptions {
pub reverse: bool,
pub frequency: ReadFrequency,
pub num_rows_per_row_group: usize,
pub projected_schema: ProjectedSchema,
Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ mod tests {
let scan_options = ScanOptions::default();
// read sst back to test
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 5,
projected_schema: reader_projected_schema,
Expand Down
7 changes: 3 additions & 4 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use table_engine::{
stream::{PartitionedStreams, SendableRecordBatchStream},
table::{
AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get,
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder,
ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites,
WaitForPendingWrites, Write, WriteRequest,
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadRequest,
Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, WaitForPendingWrites,
Write, WriteRequest,
},
ANALYTIC_ENGINE_TYPE,
};
Expand Down Expand Up @@ -491,7 +491,6 @@ impl Table for TableImpl {
opts: ReadOptions::default(),
projected_schema: request.projected_schema,
predicate,
order: ReadOrder::None,
metrics_collector: MetricsCollector::new(GET_METRICS_COLLECTOR_NAME.to_string()),
};
let mut batch_stream = self
Expand Down
Loading

0 comments on commit ee10f2f

Please sign in to comment.