Skip to content

Commit

Permalink
Minor: Move test with code (#7392)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Aug 24, 2023
1 parent 283c4b5 commit 32b2330
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 78 deletions.
78 changes: 0 additions & 78 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,81 +410,3 @@ use datafusion_execution::TaskContext;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, ordering_equivalence_properties_helper, udf,
};

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow::datatypes::Schema;

use crate::physical_plan::Distribution;
use crate::physical_plan::Partitioning;
use crate::physical_plan::PhysicalExpr;
use datafusion_physical_expr::expressions::Column;

use std::sync::Arc;

#[tokio::test]
async fn partitioning_satisfy_distribution() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("column_1", DataType::Int64, false),
arrow::datatypes::Field::new("column_2", DataType::Utf8, false),
]));

let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
];

let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
];

let distribution_types = vec![
Distribution::UnspecifiedDistribution,
Distribution::SinglePartition,
Distribution::HashPartitioned(partition_exprs1.clone()),
];

let single_partition = Partitioning::UnknownPartitioning(1);
let unspecified_partition = Partitioning::UnknownPartitioning(10);
let round_robin_partition = Partitioning::RoundRobinBatch(10);
let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);

for distribution in distribution_types {
let result = (
single_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
unspecified_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
round_robin_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
hash_partition1.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
hash_partition2.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
);

match distribution {
Distribution::UnspecifiedDistribution => {
assert_eq!(result, (true, true, true, true, true))
}
Distribution::SinglePartition => {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
assert_eq!(result, (false, false, false, true, false))
}
}
}

Ok(())
}
}
77 changes: 77 additions & 0 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,80 @@ impl Distribution {
}
}
}

#[cfg(test)]
mod tests {
use crate::expressions::Column;

use super::*;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::datatypes::Schema;
use datafusion_common::Result;

use std::sync::Arc;

#[test]
fn partitioning_satisfy_distribution() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("column_1", DataType::Int64, false),
Field::new("column_2", DataType::Utf8, false),
]));

let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
];

let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
];

let distribution_types = vec![
Distribution::UnspecifiedDistribution,
Distribution::SinglePartition,
Distribution::HashPartitioned(partition_exprs1.clone()),
];

let single_partition = Partitioning::UnknownPartitioning(1);
let unspecified_partition = Partitioning::UnknownPartitioning(10);
let round_robin_partition = Partitioning::RoundRobinBatch(10);
let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);

for distribution in distribution_types {
let result = (
single_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
unspecified_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
round_robin_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
hash_partition1.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
hash_partition2.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
);

match distribution {
Distribution::UnspecifiedDistribution => {
assert_eq!(result, (true, true, true, true, true))
}
Distribution::SinglePartition => {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
assert_eq!(result, (false, false, false, true, false))
}
}
}

Ok(())
}
}

0 comments on commit 32b2330

Please sign in to comment.