Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enhance windowed-sort optimizer rule #4910

Merged
merged 7 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ impl RegionEngine for FileRegionEngine {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let metadata = self.get_metadata(region_id).await?;
// We don't support enabling append mode for file engine.
let scanner = Box::new(SinglePartitionScanner::new(stream, false));
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
Ok(scanner)
}

Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_telemetry::tracing;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -321,6 +322,10 @@ impl RegionScanner for SeqScan {
let predicate = self.stream_ctx.input.predicate();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

fn metadata(&self) -> RegionMetadataRef {
self.stream_ctx.input.mapper.metadata().clone()
}
}

impl DisplayAs for SeqScan {
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};

use crate::error::{PartitionOutOfRangeSnafu, Result};
Expand Down Expand Up @@ -229,6 +230,10 @@ impl RegionScanner for UnorderedScan {
let predicate = self.stream_ctx.input.predicate();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

fn metadata(&self) -> RegionMetadataRef {
self.stream_ctx.input.mapper.metadata().clone()
}
}

impl DisplayAs for UnorderedScan {
Expand Down
29 changes: 24 additions & 5 deletions src/query/src/optimizer/parallelize_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -48,9 +49,16 @@ impl ParallelizeScan {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut first_order_expr = None;

let result = plan
.transform_down(|plan| {
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
// save the first order expr
first_order_expr = sort_exec.expr().first().cloned();
} else if let Some(region_scan_exec) =
plan.as_any().downcast_ref::<RegionScanExec>()
{
if region_scan_exec.is_partition_set() {
return Ok(Transformed::no(plan));
}
Expand All @@ -66,10 +74,21 @@ impl ParallelizeScan {
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);

// sort the ranges in each partition
// TODO(ruihang): smart sort!
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| a.start.cmp(&b.start));
// Sort the ranges in each partition based on the order expr
//
// This optimistically assumes that the first order expr is on the time index column
// to skip the validation of the order expr. As it's not harmful if this condition
// is not met.
if let Some(order_expr) = &first_order_expr
&& order_expr.options.descending
{
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| b.end.cmp(&a.end));
}
} else {
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| a.start.cmp(&b.start));
}
}

// update the partition ranges
Expand Down
29 changes: 21 additions & 8 deletions src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl WindowedSortPhysicalRule {
};

if let Some(first_sort_expr) = sort_exec.expr().first()
&& !first_sort_expr.options.descending
&& let Some(column_expr) = first_sort_expr
.expr
.as_any()
Expand All @@ -87,18 +86,28 @@ impl WindowedSortPhysicalRule {
} else {
return Ok(Transformed::no(plan));
}

let first_sort_expr = sort_exec.expr().first().unwrap().clone();
let part_sort_exec = Arc::new(PartSortExec::new(
first_sort_expr.clone(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
));

// PartSortExec is unnecessary if:
// - there is no tag column, and
// - the sort is ascending on the time index column
let new_input = if scanner_info.tag_columns.is_empty()
&& !first_sort_expr.options.descending
{
sort_exec.input().clone()
} else {
Arc::new(PartSortExec::new(
first_sort_expr.clone(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
))
};

let windowed_sort_exec = WindowedSortExec::try_new(
first_sort_expr,
sort_exec.fetch(),
scanner_info.partition_ranges,
part_sort_exec,
new_input,
)?;

return Ok(Transformed {
Expand All @@ -119,11 +128,13 @@ impl WindowedSortPhysicalRule {
struct ScannerInfo {
partition_ranges: Vec<Vec<PartitionRange>>,
time_index: String,
tag_columns: Vec<String>,
}

fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
let mut partition_ranges = None;
let mut time_index = None;
let mut tag_columns = None;

input.transform_up(|plan| {
// Unappliable case, reset the state.
Expand All @@ -139,6 +150,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = region_scan_exec.time_index();
tag_columns = Some(region_scan_exec.tag_columns());

// set distinguish_partition_ranges to true, this is an incorrect workaround
region_scan_exec.with_distinguish_partition_range(true);
Expand All @@ -151,6 +163,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
ScannerInfo {
partition_ranges: partition_ranges?,
time_index: time_index?,
tag_columns: tag_columns?,
}
};

Expand Down
1 change: 1 addition & 0 deletions src/query/src/part_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ struct PartSortStream {
input_complete: bool,
schema: SchemaRef,
partition_ranges: Vec<PartitionRange>,
#[allow(dead_code)] // this is used under #[debug_assertions]
partition: usize,
cur_part_idx: usize,
metrics: BaselineMetrics,
Expand Down
1 change: 1 addition & 0 deletions src/query/src/window_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub struct WindowedSortStream {
/// working ranges promise once input stream get a value out of current range, future values will never be in this range
all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
/// The input partition ranges
#[allow(dead_code)] // this is used under #[debug_assertions]
ranges: Vec<PartitionRange>,
/// Execution metrics
metrics: BaselineMetrics,
Expand Down
17 changes: 15 additions & 2 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Returns the schema of the record batches.
fn schema(&self) -> SchemaRef;

/// Returns the metadata of the region.
fn metadata(&self) -> RegionMetadataRef;

/// Prepares the scanner with the given partition ranges.
///
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
Expand Down Expand Up @@ -414,18 +417,24 @@ pub struct SinglePartitionScanner {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
properties: ScannerProperties,
metadata: RegionMetadataRef,
}

impl SinglePartitionScanner {
/// Creates a new [SinglePartitionScanner] with the given stream.
pub fn new(stream: SendableRecordBatchStream, append_mode: bool) -> Self {
/// Creates a new [SinglePartitionScanner] with the given stream and metadata.
pub fn new(
stream: SendableRecordBatchStream,
append_mode: bool,
metadata: RegionMetadataRef,
) -> Self {
let schema = stream.schema();
Self {
stream: Mutex::new(Some(stream)),
schema,
properties: ScannerProperties::default()
.with_parallelism(1)
.with_append_mode(append_mode),
metadata,
}
}
}
Expand Down Expand Up @@ -468,6 +477,10 @@ impl RegionScanner for SinglePartitionScanner {
fn has_predicate(&self) -> bool {
false
}

fn metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
}

impl DisplayAs for SinglePartitionScanner {
Expand Down
59 changes: 50 additions & 9 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ impl RegionScanExec {
.timestamp_column()
.map(|x| x.name.clone())
}

pub fn tag_columns(&self) -> Vec<String> {
self.scanner
.lock()
.unwrap()
.metadata()
.primary_key_columns()
.map(|col| col.column_schema.name.clone())
.collect()
}
}

impl ExecutionPlan for RegionScanExec {
Expand Down Expand Up @@ -301,41 +311,72 @@ impl DfRecordBatchStream for StreamWithMetricWrapper {
mod test {
use std::sync::Arc;

use api::v1::SemanticType;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use datatypes::vectors::{Int32Vector, TimestampMillisecondVector};
use futures::TryStreamExt;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::region_engine::SinglePartitionScanner;
use store_api::storage::RegionId;

use super::*;

#[tokio::test]
async fn test_simple_table_scan() {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
false,
)]));
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
]));

let batch1 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1, 2])) as _],
vec![
Arc::new(Int32Vector::from_slice([1, 2])) as _,
Arc::new(TimestampMillisecondVector::from_slice([1000, 2000])) as _,
],
)
.unwrap();
let batch2 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _],
vec![
Arc::new(Int32Vector::from_slice([3, 4, 5])) as _,
Arc::new(TimestampMillisecondVector::from_slice([3000, 4000, 5000])) as _,
],
)
.unwrap();

let recordbatches =
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();

let scanner = Box::new(SinglePartitionScanner::new(stream, false));
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1]);
let region_metadata = Arc::new(builder.build().unwrap());

let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
let plan = RegionScanExec::new(scanner);
let actual: SchemaRef = Arc::new(
plan.properties
Expand Down
Loading