Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/src/bin/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use datafusion::common::Result;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExprOrderingRef};
use datafusion::prelude::{col, SessionConfig, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_benchmarks::BenchmarkRun;
Expand Down Expand Up @@ -319,7 +319,7 @@ async fn exec_scan(

async fn exec_sort(
ctx: &SessionContext,
expr: &[PhysicalSortExpr],
expr: ExprOrderingRef<'_>,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::datasource::provider_as_source;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::expressions::ExprOrderingRef;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<ExprOrderingRef> {
None
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ use datafusion::{
},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
use datafusion_physical_expr::{expressions::col, ExprOrdering, PhysicalSortExpr};
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -304,7 +304,7 @@ impl MergeBenchCase {
}

/// Make sort exprs for each column in `schema`
fn make_sort_exprs(schema: &Schema) -> Vec<PhysicalSortExpr> {
fn make_sort_exprs(schema: &Schema) -> ExprOrdering {
schema
.fields()
.iter()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::{
physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
use datafusion_physical_expr::{expressions::col, ExprOrdering, PhysicalSortExpr};
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -288,7 +288,7 @@ impl SortBenchCasePreservePartitioning {
}

/// Make sort exprs for each column in `schema`
fn make_sort_exprs(schema: &Schema) -> Vec<PhysicalSortExpr> {
fn make_sort_exprs(schema: &Schema) -> ExprOrdering {
schema
.fields()
.iter()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use dashmap::DashMap;
use datafusion_common::ToDFSchema;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, PhysicalSortExpr};
use datafusion_physical_expr::{create_physical_expr, ExprOrdering, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -611,7 +611,7 @@ impl ListingTable {
}

/// If file_sort_order is specified, creates the appropriate physical expressions
fn try_create_output_ordering(&self) -> Result<Option<Vec<PhysicalSortExpr>>> {
fn try_create_output_ordering(&self) -> Result<Option<ExprOrdering>> {
let file_sort_order =
if let Some(file_sort_order) = self.options.file_sort_order.as_ref() {
file_sort_order
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ mod tests {
use datafusion_expr::Operator;
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit, expressions::Column,
PhysicalExpr, PhysicalSortExpr,
ExprOrdering, PhysicalExpr, PhysicalSortExpr,
};
use std::ops::Deref;

Expand Down Expand Up @@ -982,9 +982,7 @@ mod tests {
parquet_exec_with_sort(None)
}

fn parquet_exec_with_sort(
output_ordering: Option<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
fn parquet_exec_with_sort(output_ordering: Option<ExprOrdering>) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ mod tests {
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
make_requirements_from_ordering, ExprOrderingRef, OrderingRequirement,
};

fn schema() -> SchemaRef {
Expand Down Expand Up @@ -1150,7 +1150,7 @@ mod tests {
self.input.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<ExprOrderingRef> {
self.input.output_ordering()
}

Expand All @@ -1159,10 +1159,8 @@ mod tests {
}

// model that it requires the output ordering of its input
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.map(make_sort_requirements_from_exprs)]
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirement>> {
vec![self.output_ordering().map(make_requirements_from_ordering)]
}

fn with_new_children(
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use datafusion_physical_expr::utils::{
make_sort_exprs_from_requirements, ordering_satisfy,
ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_expr::{ExprOrderingRef, PhysicalExpr, PhysicalSortExpr};
use itertools::{concat, izip};
use std::iter::zip;
use std::sync::Arc;
Expand Down Expand Up @@ -766,7 +766,7 @@ fn remove_corresponding_sort_from_sub_plan(
}

/// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice when possible.
fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&[PhysicalSortExpr]> {
fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> Result<ExprOrderingRef> {
if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
Ok(sort_exec.expr())
} else if let Some(sort_preserving_merge_exec) =
Expand Down Expand Up @@ -795,9 +795,9 @@ pub struct ColumnInfo {
/// remove physical sort expressions from the plan.
pub fn can_skip_sort(
partition_keys: &[Arc<dyn PhysicalExpr>],
required: &[PhysicalSortExpr],
required: ExprOrderingRef,
input_schema: &SchemaRef,
physical_ordering: &[PhysicalSortExpr],
physical_ordering: ExprOrderingRef,
) -> Result<(bool, bool)> {
if required.len() > physical_ordering.len() {
return Ok((false, false));
Expand Down
44 changes: 22 additions & 22 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use datafusion_physical_expr::utils::{
requirements_compatible,
};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement,
make_requirements_from_ordering, ExprOrdering, ExprOrderingRef, OrderingRequirement,
PhysicalSortRequirement,
};
use itertools::izip;
use std::ops::Deref;
use std::sync::Arc;

/// This is a "data class" we use within the [`EnforceSorting`] rule to push
Expand All @@ -45,10 +45,10 @@ pub(crate) struct SortPushDown {
/// Current plan
pub plan: Arc<dyn ExecutionPlan>,
/// Parent required sort ordering
required_ordering: Option<Vec<PhysicalSortRequirement>>,
required_ordering: Option<OrderingRequirement>,
/// The adjusted request sort ordering to children.
/// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirement>>>,
adjusted_request_ordering: Vec<Option<OrderingRequirement>>,
}

impl SortPushDown {
Expand Down Expand Up @@ -121,7 +121,7 @@ pub(crate) fn pushdown_sorts(
requirements: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
let plan = &requirements.plan;
let parent_required = requirements.required_ordering.as_deref();
let parent_required = requirements.required_ordering.as_ref();
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
Expand All @@ -137,11 +137,11 @@ pub(crate) fn pushdown_sorts(
};
let required_ordering = new_plan
.output_ordering()
.map(make_sort_requirements_from_exprs);
.map(make_requirements_from_ordering);
// Since new_plan is a SortExec, we can safely get the 0th index.
let child = &new_plan.children()[0];
if let Some(adjusted) =
pushdown_requirement_to_children(child, required_ordering.as_deref())?
pushdown_requirement_to_children(child, required_ordering.as_ref())?
{
// Can push down requirements
Ok(Transformed::Yes(SortPushDown {
Expand Down Expand Up @@ -184,14 +184,14 @@ pub(crate) fn pushdown_sorts(

fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
parent_required: Option<&OrderingRequirement>,
) -> Result<Option<Vec<Option<OrderingRequirement>>>> {
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].as_deref();
let request_child = required_input_ordering[0].as_ref();
let child_plan = plan.children()[0].clone();
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
Expand Down Expand Up @@ -238,7 +238,7 @@ fn pushdown_requirement_to_children(
};
try_pushdown_requirements_to_join(
plan,
Some(new_right_required.deref()),
Some(new_right_required.as_ref()),
parent_required_expr,
JoinSide::Right,
)
Expand Down Expand Up @@ -274,8 +274,8 @@ fn pushdown_requirement_to_children(
/// If the the parent requirements are more specific, push down the parent requirements
/// If they are not compatible, need to add Sort.
fn determine_children_requirement(
parent_required: Option<&[PhysicalSortRequirement]>,
request_child: Option<&[PhysicalSortRequirement]>,
parent_required: Option<&OrderingRequirement>,
request_child: Option<&OrderingRequirement>,
child_plan: Arc<dyn ExecutionPlan>,
) -> RequirementsCompatibility {
if requirements_compatible(request_child, parent_required, || {
Expand All @@ -296,16 +296,16 @@ fn determine_children_requirement(

fn try_pushdown_requirements_to_join(
plan: &Arc<dyn ExecutionPlan>,
parent_required: Option<&[PhysicalSortRequirement]>,
sort_expr: Vec<PhysicalSortExpr>,
parent_required: Option<&OrderingRequirement>,
sort_expr: ExprOrdering,
push_side: JoinSide,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
) -> Result<Option<Vec<Option<OrderingRequirement>>>> {
let child_idx = match push_side {
JoinSide::Left => 0,
JoinSide::Right => 1,
};
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[child_idx].as_deref();
let request_child = required_input_ordering[child_idx].as_ref();
let child_plan = plan.children()[child_idx].clone();
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => Ok(None),
Expand All @@ -329,7 +329,7 @@ fn try_pushdown_requirements_to_join(
}

fn expr_source_sides(
required_exprs: &[PhysicalSortExpr],
required_exprs: ExprOrderingRef,
join_type: JoinType,
left_columns_len: usize,
) -> Option<JoinSide> {
Expand Down Expand Up @@ -377,10 +377,10 @@ fn expr_source_sides(
}

fn shift_right_required(
parent_required: &[PhysicalSortRequirement],
parent_required: &OrderingRequirement,
left_columns_len: usize,
) -> Result<Vec<PhysicalSortRequirement>> {
let new_right_required: Vec<PhysicalSortRequirement> = parent_required
) -> Result<OrderingRequirement> {
let new_right_required = parent_required
.iter()
.filter_map(|r| {
r.expr.as_any().downcast_ref::<Column>().and_then(|col| {
Expand Down Expand Up @@ -410,7 +410,7 @@ enum RequirementsCompatibility {
/// Requirements satisfy
Satisfy,
/// Requirements compatible
Compatible(Option<Vec<PhysicalSortRequirement>>),
Compatible(Option<OrderingRequirement>),
/// Requirements not compatible
NonCompatible,
}
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::tree_node::Transformed;
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::ExprOrdering;
use std::sync::Arc;

/// Convenience rule for writing optimizers: recursively invoke
Expand Down Expand Up @@ -60,7 +60,7 @@ pub fn optimize_children(
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
sort_expr: Vec<PhysicalSortExpr>,
sort_expr: ExprOrdering,
) -> Result<()> {
// If the ordering requirement is already satisfied, do not add a sort.
if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
expressions, AggregateExpr, ExprOrderingRef, PhysicalExpr,
};
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -354,7 +354,7 @@ impl ExecutionPlan for AggregateExec {
}
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<ExprOrderingRef> {
None
}

Expand Down Expand Up @@ -725,7 +725,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{lit, ApproxDistinct, Count, Median};
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_expr::{AggregateExpr, ExprOrderingRef, PhysicalExpr};
use futures::{FutureExt, Stream};
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -991,7 +991,7 @@ mod tests {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<ExprOrderingRef> {
None
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::expressions::PhysicalSortExpr;
use super::expressions::ExprOrderingRef;
use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;

Expand Down Expand Up @@ -95,7 +95,7 @@ impl ExecutionPlan for AnalyzeExec {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<ExprOrderingRef> {
None
}

Expand Down
Loading