Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hash support for PhysicalExpr and PhysicalSortExpr #6625

Merged
merged 17 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 17 additions & 79 deletions datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use crate::{

use arrow::datatypes::SchemaRef;

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::Arc;

/// Represents a collection of [`EquivalentClass`] (equivalences
Expand All @@ -39,7 +40,7 @@ pub struct EquivalenceProperties<T = Column> {
schema: SchemaRef,
}

impl<T: PartialEq + Clone> EquivalenceProperties<T> {
impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

pub fn new(schema: SchemaRef) -> Self {
EquivalenceProperties {
classes: vec![],
Expand Down Expand Up @@ -113,33 +114,6 @@ impl<T: PartialEq + Clone> EquivalenceProperties<T> {
}
}

/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries
fn deduplicate_vector<T: PartialEq>(in_data: Vec<T>) -> Vec<T> {
let mut result = vec![];
for elem in in_data {
if !result.contains(&elem) {
result.push(elem);
}
}
result
}

/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`.
fn get_entry_position<T: PartialEq>(in_data: &[T], entry: &T) -> Option<usize> {
in_data.iter().position(|item| item.eq(entry))
}

/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`)
/// Otherwise return `false`
fn remove_from_vec<T: PartialEq>(in_data: &mut Vec<T>, entry: &T) -> bool {
if let Some(idx) = get_entry_position(in_data, entry) {
in_data.remove(idx);
true
} else {
false
}
}

// Helper function to calculate column info recursively
fn get_column_indices_helper(
indices: &mut Vec<(usize, String)>,
Expand Down Expand Up @@ -187,20 +161,22 @@ pub struct EquivalentClass<T = Column> {
/// First element in the EquivalentClass
head: T,
/// Other equal columns
others: Vec<T>,
others: HashSet<T>,
}

impl<T: PartialEq + Clone> EquivalentClass<T> {
impl<T: Eq + Hash + Clone> EquivalentClass<T> {
pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
let others = deduplicate_vector(others);
EquivalentClass { head, others }
EquivalentClass {
head,
others: HashSet::from_iter(others),
}
}

pub fn head(&self) -> &T {
&self.head
}

pub fn others(&self) -> &[T] {
pub fn others(&self) -> &HashSet<T> {
&self.others
}

Expand All @@ -209,24 +185,20 @@ impl<T: PartialEq + Clone> EquivalentClass<T> {
}

pub fn insert(&mut self, col: T) -> bool {
if self.head != col && !self.others.contains(&col) {
self.others.push(col);
true
} else {
false
}
self.head != col && self.others.insert(col)
}

pub fn remove(&mut self, col: &T) -> bool {
let removed = remove_from_vec(&mut self.others, col);
// If we are removing the head, shift others so that its first entry becomes the new head.
let removed = self.others.remove(col);
// If we are removing the head, adjust others so that its first entry becomes the new head.
if !removed && *col == self.head {
let one_col = self.others.first().cloned();
if let Some(col) = one_col {
let removed = remove_from_vec(&mut self.others, &col);
if let Some(col) = self.others.iter().next().cloned() {
let removed = self.others.remove(&col);
self.head = col;
removed
} else {
// We don't allow empty equivalence classes, reject removal if one tries removing
// the only element in an equivalence class.
false
}
} else {
Expand Down Expand Up @@ -556,40 +528,6 @@ mod tests {
Ok(())
}

#[test]
fn test_deduplicate_vector() -> Result<()> {
assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]);
assert_eq!(
deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]),
vec![1, 2, 3, 4, 0]
);
Ok(())
}

#[test]
fn test_get_entry_position() -> Result<()> {
assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2));
assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0));
assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None);
Ok(())
}

#[test]
fn test_remove_from_vec() -> Result<()> {
let mut in_data = vec![1, 1, 2, 3, 3];
remove_from_vec(&mut in_data, &5);
assert_eq!(in_data, vec![1, 1, 2, 3, 3]);
remove_from_vec(&mut in_data, &2);
assert_eq!(in_data, vec![1, 1, 3, 3]);
remove_from_vec(&mut in_data, &2);
assert_eq!(in_data, vec![1, 1, 3, 3]);
remove_from_vec(&mut in_data, &3);
assert_eq!(in_data, vec![1, 1, 3]);
remove_from_vec(&mut in_data, &3);
assert_eq!(in_data, vec![1, 1]);
Ok(())
}

#[test]
fn test_get_column_infos() -> Result<()> {
let expr1 = Arc::new(Column::new("col1", 2)) as _;
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod adapter;
mod kernels;
mod kernels_arrow;

use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

use arrow::array::*;
Expand Down Expand Up @@ -96,7 +97,7 @@ use datafusion_expr::type_coercion::binary::{
use datafusion_expr::{ColumnarValue, Operator};

/// Binary expression
#[derive(Debug)]
#[derive(Debug, Hash)]
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
Expand Down Expand Up @@ -837,6 +838,11 @@ impl PhysicalExpr for BinaryExpr {
};
Ok(vec![left, right])
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for BinaryExpr {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::borrow::Cow;
use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

use crate::expressions::try_cast;
Expand Down Expand Up @@ -51,7 +52,7 @@ type WhenThen = (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>);
/// [WHEN ...]
/// [ELSE result]
/// END
#[derive(Debug)]
#[derive(Debug, Hash)]
pub struct CaseExpr {
/// Optional base expression that can be compared to literal values in the "when" expressions
expr: Option<Arc<dyn PhysicalExpr>>,
Expand Down Expand Up @@ -348,6 +349,11 @@ impl PhysicalExpr for CaseExpr {
)?))
}
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for CaseExpr {
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::any::Any;
use std::fmt;
use std::hash::Hasher;
use std::sync::Arc;

use crate::intervals::Interval;
Expand Down Expand Up @@ -132,6 +133,12 @@ impl PhysicalExpr for CastExpr {
interval.cast_to(&cast_type, &self.cast_options)?,
)])
}

fn dyn_hash(&self, _state: &mut dyn Hasher) {
// `self.cast_options` doesn't support hashing
// Hence we cannot calculate `dyn_hash` for this type.
unimplemented!();
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl PartialEq<dyn Any> for CastExpr {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Column expression

use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use arrow::{
Expand Down Expand Up @@ -109,6 +110,11 @@ impl PhysicalExpr for Column {
let col_bounds = context.column_boundaries[self.index].clone();
context.with_boundaries(col_bounds)
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for Column {
Expand Down Expand Up @@ -191,6 +197,11 @@ impl PhysicalExpr for UnKnownColumn {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for UnKnownColumn {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-expr/src/expressions/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::{ColumnarValue, Operator};
use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar};

/// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math
#[derive(Debug)]
#[derive(Debug, Hash)]
pub struct DateTimeIntervalExpr {
lhs: Arc<dyn PhysicalExpr>,
op: Operator,
Expand Down Expand Up @@ -185,6 +186,11 @@ impl PhysicalExpr for DateTimeIntervalExpr {
children[1].clone(),
)))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for DateTimeIntervalExpr {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ use datafusion_expr::{
};
use std::convert::TryInto;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

/// expression to get a field of a struct array.
#[derive(Debug)]
#[derive(Debug, Hash)]
pub struct GetIndexedFieldExpr {
arg: Arc<dyn PhysicalExpr>,
key: ScalarValue,
Expand Down Expand Up @@ -153,6 +154,11 @@ impl PhysicalExpr for GetIndexedFieldExpr {
self.key.clone(),
)))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for GetIndexedFieldExpr {
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use ahash::RandomState;
use std::any::Any;
use std::fmt::Debug;
use std::hash::Hasher;
use std::sync::Arc;

use crate::hash_utils::HashValue;
Expand Down Expand Up @@ -330,6 +331,12 @@ impl PhysicalExpr for InListExpr {
self.static_filter.clone(),
)))
}

fn dyn_hash(&self, _state: &mut dyn Hasher) {
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
// `self.static_filter` doesn't support hashing. Hence
// we cannot calculate hash for this type.
unimplemented!();
}
}

impl PartialEq<dyn Any> for InListExpr {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-expr/src/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! IS NOT NULL expression

use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

use crate::physical_expr::down_cast_any_ref;
Expand All @@ -31,7 +32,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;

/// IS NOT NULL expression
#[derive(Debug)]
#[derive(Debug, Hash)]
pub struct IsNotNullExpr {
/// The input expression
arg: Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -91,6 +92,11 @@ impl PhysicalExpr for IsNotNullExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(IsNotNullExpr::new(children[0].clone())))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for IsNotNullExpr {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-expr/src/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! IS NULL expression

use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

use arrow::compute;
Expand All @@ -32,7 +33,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;

/// IS NULL expression
#[derive(Debug)]
#[derive(Debug, Hash)]
pub struct IsNullExpr {
/// Input expression
arg: Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -92,6 +93,11 @@ impl PhysicalExpr for IsNullExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(IsNullExpr::new(children[0].clone())))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

impl PartialEq<dyn Any> for IsNullExpr {
Expand Down
Loading