Skip to content

Commit

Permalink
Deprecate LexOrderingRef and LexRequirementRef (#13233)
Browse files Browse the repository at this point in the history
* converted LexOrderingRef to &LexOrdering

* using  LexOrdering::from_ref fn  instead of directly cloning it

* using as_ref instread of &

* using as_ref

* removed commented code

* updated cargo lock

* updated LexRequirementRef to &LexRequirement

* fixed clippy issues

* fixed taplo error for cargo.toml in physical-expr-common

* removed commented code

* fixed clippy errors

* fixed clippy error

* fixes

* removed  LexOrdering::from_ref instead using clone and created LexOrdering::empty() fn

* Update mod.rs

---------

Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
Co-authored-by: berkaysynnada <berkay.sahin@synnada.ai>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent 4f169ec commit 9005585
Show file tree
Hide file tree
Showing 48 changed files with 396 additions and 303 deletions.
39 changes: 18 additions & 21 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -70,31 +70,28 @@ impl RunOpt {
let sort_cases = vec![
(
"sort utf8",
vec![PhysicalSortExpr {
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("request_method", &schema)?,
options: Default::default(),
}],
}]),
),
(
"sort int",
vec![PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
}],
}]),
),
(
"sort decimal",
vec![
// sort decimal
PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
}]),
),
(
"sort integer tuple",
vec![
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
Expand All @@ -103,11 +100,11 @@ impl RunOpt {
expr: col("response_bytes", &schema)?,
options: Default::default(),
},
],
]),
),
(
"sort utf8 tuple",
vec![
LexOrdering::new(vec![
// sort utf8 tuple
PhysicalSortExpr {
expr: col("service", &schema)?,
Expand All @@ -125,11 +122,11 @@ impl RunOpt {
expr: col("image", &schema)?,
options: Default::default(),
},
],
]),
),
(
"sort mixed tuple",
vec![
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
Expand All @@ -142,7 +139,7 @@ impl RunOpt {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
]),
),
];
for (title, expr) in sort_cases {
Expand Down Expand Up @@ -170,13 +167,13 @@ impl RunOpt {

async fn exec_sort(
ctx: &SessionContext,
expr: LexOrderingRef<'_>,
expr: &LexOrdering,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
let exec = Arc::new(SortExec::new(expr.clone(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
Expand Down
33 changes: 17 additions & 16 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 14 additions & 13 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::LexOrderingRef;

use log::warn;

Expand Down Expand Up @@ -308,7 +307,7 @@ impl FileScanConfig {
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[Vec<PartitionedFile>],
sort_order: LexOrderingRef,
sort_order: &LexOrdering,
) -> Result<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
// First Fit:
Expand Down Expand Up @@ -1113,17 +1112,19 @@ mod tests {
))))
.collect::<Vec<_>>(),
));
let sort_order = case
.sort
.into_iter()
.map(|expr| {
crate::physical_planner::create_physical_sort_expr(
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&ExecutionProps::default(),
)
})
.collect::<Result<Vec<_>>>()?;
let sort_order = LexOrdering {
inner: case
.sort
.into_iter()
.map(|expr| {
crate::physical_planner::create_physical_sort_expr(
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&ExecutionProps::default(),
)
})
.collect::<Result<Vec<_>>>()?,
};

let partitioned_files =
case.files.into_iter().map(From::from).collect::<Vec<_>>();
Expand Down
34 changes: 18 additions & 16 deletions datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
/// The min/max values are ordered by [`Self::sort_order`].
Expand All @@ -50,7 +50,7 @@ pub(crate) struct MinMaxStatistics {
impl MinMaxStatistics {
/// Sort order used to sort the statistics
#[allow(unused)]
pub fn sort_order(&self) -> LexOrderingRef {
pub fn sort_order(&self) -> &LexOrdering {
&self.sort_order
}

Expand All @@ -66,8 +66,8 @@ impl MinMaxStatistics {
}

pub fn new_from_files<'a>(
projected_sort_order: LexOrderingRef, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<Self> {
Expand Down Expand Up @@ -119,15 +119,17 @@ impl MinMaxStatistics {
projected_schema
.project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
);
let min_max_sort_order = sort_columns
.iter()
.zip(projected_sort_order.iter())
.enumerate()
.map(|(i, (col, sort))| PhysicalSortExpr {
expr: Arc::new(Column::new(col.name(), i)),
options: sort.options,
})
.collect::<Vec<_>>();
let min_max_sort_order = LexOrdering {
inner: sort_columns
.iter()
.zip(projected_sort_order.iter())
.enumerate()
.map(|(i, (col, sort))| PhysicalSortExpr {
expr: Arc::new(Column::new(col.name(), i)),
options: sort.options,
})
.collect::<Vec<_>>(),
};

let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
.iter()
Expand Down Expand Up @@ -167,7 +169,7 @@ impl MinMaxStatistics {
}

pub fn new(
sort_order: LexOrderingRef,
sort_order: &LexOrdering,
schema: &SchemaRef,
min_values: RecordBatch,
max_values: RecordBatch,
Expand Down Expand Up @@ -257,7 +259,7 @@ impl MinMaxStatistics {
Ok(Self {
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
sort_order: LexOrdering::from_ref(sort_order),
sort_order: sort_order.clone(),
})
}

Expand All @@ -278,7 +280,7 @@ impl MinMaxStatistics {
}

fn sort_columns_from_physical_sort_exprs(
sort_order: LexOrderingRef,
sort_order: &LexOrdering,
) -> Option<Vec<&Column>> {
sort_order
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;

/// The `EnforceDistribution` rule ensures that distribution requirements are
Expand Down Expand Up @@ -936,7 +936,11 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {

let new_plan = if should_preserve_ordering {
Arc::new(SortPreservingMergeExec::new(
LexOrdering::from_ref(input.plan.output_ordering().unwrap_or(&[])),
input
.plan
.output_ordering()
.unwrap_or(&LexOrdering::default())
.clone(),
input.plan.clone(),
)) as _
} else {
Expand Down
Loading

0 comments on commit 9005585

Please sign in to comment.