Skip to content

Commit 11fc52d

Browse files
authored
Use dedicated NullEquality enum instead of null_equals_null boolean (#16419)
* Use dedicated NullEquality enum instead of null_equals_null boolean * Fix wrong operator mapping in hash_join * Add an example to the documentation
1 parent 87218a1 commit 11fc52d

File tree

38 files changed

+823
-363
lines changed

38 files changed

+823
-363
lines changed

datafusion/common/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub mod file_options;
4646
pub mod format;
4747
pub mod hash_utils;
4848
pub mod instant;
49+
mod null_equality;
4950
pub mod parsers;
5051
pub mod pruning;
5152
pub mod rounding;
@@ -79,6 +80,7 @@ pub use functional_dependencies::{
7980
};
8081
use hashbrown::hash_map::DefaultHashBuilder;
8182
pub use join_type::{JoinConstraint, JoinSide, JoinType};
83+
pub use null_equality::NullEquality;
8284
pub use param_value::ParamValues;
8385
pub use scalar::{ScalarType, ScalarValue};
8486
pub use schema_reference::SchemaReference;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
/// Represents the behavior for null values when evaluating equality. Currently, its primary use
19+
/// case is to define the behavior of joins for null values.
20+
///
21+
/// # Examples
22+
///
23+
/// The following table shows the expected equality behavior for `NullEquality`.
24+
///
25+
/// | A | B | NullEqualsNothing | NullEqualsNull |
26+
/// |------|------|-------------------|----------------|
27+
/// | NULL | NULL | false | true |
28+
/// | NULL | 'b' | false | false |
29+
/// | 'a' | NULL | false | false |
30+
/// | 'a' | 'b' | false | false |
31+
///
32+
/// # Order
33+
///
34+
/// The order on this type represents the "restrictiveness" of the behavior. The more restrictive
35+
/// a behavior is, the fewer elements are considered to be equal to null.
36+
/// [NullEquality::NullEqualsNothing] represents the most restrictive behavior.
37+
///
38+
/// This mirrors the old order with `null_equals_null` booleans, as `false` indicated that
39+
/// `null != null`.
40+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
41+
pub enum NullEquality {
42+
/// Null is *not* equal to anything (`null != null`)
43+
NullEqualsNothing,
44+
/// Null is equal to null (`null == null`)
45+
NullEqualsNull,
46+
}

datafusion/core/src/physical_planner.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -901,12 +901,10 @@ impl DefaultPhysicalPlanner {
901901
on: keys,
902902
filter,
903903
join_type,
904-
null_equals_null,
904+
null_equality,
905905
schema: join_schema,
906906
..
907907
}) => {
908-
let null_equals_null = *null_equals_null;
909-
910908
let [physical_left, physical_right] = children.two()?;
911909

912910
// If join has expression equijoin keys, add physical projection.
@@ -1127,7 +1125,7 @@ impl DefaultPhysicalPlanner {
11271125
join_filter,
11281126
*join_type,
11291127
vec![SortOptions::default(); join_on_len],
1130-
null_equals_null,
1128+
*null_equality,
11311129
)?)
11321130
} else if session_state.config().target_partitions() > 1
11331131
&& session_state.config().repartition_joins()
@@ -1141,7 +1139,7 @@ impl DefaultPhysicalPlanner {
11411139
join_type,
11421140
None,
11431141
PartitionMode::Auto,
1144-
null_equals_null,
1142+
*null_equality,
11451143
)?)
11461144
} else {
11471145
Arc::new(HashJoinExec::try_new(
@@ -1152,7 +1150,7 @@ impl DefaultPhysicalPlanner {
11521150
join_type,
11531151
None,
11541152
PartitionMode::CollectLeft,
1155-
null_equals_null,
1153+
*null_equality,
11561154
)?)
11571155
};
11581156

datafusion/core/tests/execution/infinite_cancel.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion::physical_plan::execution_plan::Boundedness;
3333
use datafusion::physical_plan::ExecutionPlan;
3434
use datafusion::prelude::SessionContext;
3535
use datafusion_common::config::ConfigOptions;
36-
use datafusion_common::{JoinType, ScalarValue};
36+
use datafusion_common::{JoinType, NullEquality, ScalarValue};
3737
use datafusion_expr_common::operator::Operator::Gt;
3838
use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
3939
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -467,7 +467,7 @@ async fn test_infinite_join_cancel(
467467
&JoinType::Inner,
468468
None,
469469
PartitionMode::CollectLeft,
470-
true,
470+
NullEquality::NullEqualsNull,
471471
)?);
472472

473473
// 3) Wrap yields under each infinite leaf
@@ -550,7 +550,7 @@ async fn test_infinite_join_agg_cancel(
550550
&JoinType::Inner,
551551
None,
552552
PartitionMode::CollectLeft,
553-
true,
553+
NullEquality::NullEqualsNull,
554554
)?);
555555

556556
// 3) Project only one column (“value” from the left side) because we just want to sum that
@@ -714,7 +714,7 @@ async fn test_infinite_hash_join_without_repartition_and_no_agg(
714714
/* output64 */ None,
715715
// Using CollectLeft is fine—just avoid RepartitionExec’s partitioned channels.
716716
PartitionMode::CollectLeft,
717-
/* build_left */ true,
717+
/* build_left */ NullEquality::NullEqualsNull,
718718
)?);
719719

720720
// 3) Do not apply InsertYieldExec—since there is no aggregation, InsertYieldExec would
@@ -796,7 +796,7 @@ async fn test_infinite_sort_merge_join_without_repartition_and_no_agg(
796796
/* filter */ None,
797797
JoinType::Inner,
798798
vec![SortOptions::new(true, false)], // ascending, nulls last
799-
/* null_equal */ true,
799+
/* null_equality */ NullEquality::NullEqualsNull,
800800
)?);
801801

802802
// 3) Do not apply InsertYieldExec (no aggregation, no repartition → no built-in yields).

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion::physical_plan::joins::{
3737
HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
3838
};
3939
use datafusion::prelude::{SessionConfig, SessionContext};
40-
use datafusion_common::ScalarValue;
40+
use datafusion_common::{NullEquality, ScalarValue};
4141
use datafusion_physical_expr::expressions::Literal;
4242
use datafusion_physical_expr::PhysicalExprRef;
4343

@@ -504,7 +504,7 @@ impl JoinFuzzTestCase {
504504
self.join_filter(),
505505
self.join_type,
506506
vec![SortOptions::default(); self.on_columns().len()],
507-
false,
507+
NullEquality::NullEqualsNothing,
508508
)
509509
.unwrap(),
510510
)
@@ -521,7 +521,7 @@ impl JoinFuzzTestCase {
521521
&self.join_type,
522522
None,
523523
PartitionMode::Partitioned,
524-
false,
524+
NullEquality::NullEqualsNothing,
525525
)
526526
.unwrap(),
527527
)

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use std::{
2525
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2626
use arrow::record_batch::RecordBatch;
2727
use datafusion_common::config::ConfigOptions;
28-
use datafusion_common::JoinSide;
2928
use datafusion_common::{stats::Precision, ColumnStatistics, JoinType, ScalarValue};
29+
use datafusion_common::{JoinSide, NullEquality};
3030
use datafusion_common::{Result, Statistics};
3131
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
3232
use datafusion_expr::Operator;
@@ -222,7 +222,7 @@ async fn test_join_with_swap() {
222222
&JoinType::Left,
223223
None,
224224
PartitionMode::CollectLeft,
225-
false,
225+
NullEquality::NullEqualsNothing,
226226
)
227227
.unwrap(),
228228
);
@@ -284,7 +284,7 @@ async fn test_left_join_no_swap() {
284284
&JoinType::Left,
285285
None,
286286
PartitionMode::CollectLeft,
287-
false,
287+
NullEquality::NullEqualsNothing,
288288
)
289289
.unwrap(),
290290
);
@@ -333,7 +333,7 @@ async fn test_join_with_swap_semi() {
333333
&join_type,
334334
None,
335335
PartitionMode::Partitioned,
336-
false,
336+
NullEquality::NullEqualsNothing,
337337
)
338338
.unwrap();
339339

@@ -408,7 +408,7 @@ async fn test_nested_join_swap() {
408408
&JoinType::Inner,
409409
None,
410410
PartitionMode::CollectLeft,
411-
false,
411+
NullEquality::NullEqualsNothing,
412412
)
413413
.unwrap();
414414
let child_schema = child_join.schema();
@@ -425,7 +425,7 @@ async fn test_nested_join_swap() {
425425
&JoinType::Left,
426426
None,
427427
PartitionMode::CollectLeft,
428-
false,
428+
NullEquality::NullEqualsNothing,
429429
)
430430
.unwrap();
431431

@@ -464,7 +464,7 @@ async fn test_join_no_swap() {
464464
&JoinType::Inner,
465465
None,
466466
PartitionMode::CollectLeft,
467-
false,
467+
NullEquality::NullEqualsNothing,
468468
)
469469
.unwrap(),
470470
);
@@ -690,7 +690,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
690690
&join_type,
691691
Some(projection),
692692
PartitionMode::Partitioned,
693-
false,
693+
NullEquality::NullEqualsNothing,
694694
)?);
695695

696696
let swapped = join
@@ -851,7 +851,7 @@ fn check_join_partition_mode(
851851
&JoinType::Inner,
852852
None,
853853
PartitionMode::Auto,
854-
false,
854+
NullEquality::NullEqualsNothing,
855855
)
856856
.unwrap(),
857857
);
@@ -1498,7 +1498,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
14981498
&t.initial_join_type,
14991499
None,
15001500
t.initial_mode,
1501-
false,
1501+
NullEquality::NullEqualsNothing,
15021502
)?) as _;
15031503

15041504
let optimized_join_plan = hash_join_swap_subrule(join, &ConfigOptions::new())?;

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::datasource::memory::MemorySourceConfig;
2525
use datafusion::datasource::physical_plan::CsvSource;
2626
use datafusion::datasource::source::DataSourceExec;
2727
use datafusion_common::config::ConfigOptions;
28-
use datafusion_common::{JoinSide, JoinType, Result, ScalarValue};
28+
use datafusion_common::{JoinSide, JoinType, NullEquality, Result, ScalarValue};
2929
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3030
use datafusion_execution::object_store::ObjectStoreUrl;
3131
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -883,7 +883,7 @@ fn test_join_after_projection() -> Result<()> {
883883
])),
884884
)),
885885
&JoinType::Inner,
886-
true,
886+
NullEquality::NullEqualsNull,
887887
None,
888888
None,
889889
StreamJoinPartitionMode::SinglePartition,
@@ -997,7 +997,7 @@ fn test_join_after_required_projection() -> Result<()> {
997997
])),
998998
)),
999999
&JoinType::Inner,
1000-
true,
1000+
NullEquality::NullEqualsNull,
10011001
None,
10021002
None,
10031003
StreamJoinPartitionMode::SinglePartition,
@@ -1158,7 +1158,7 @@ fn test_hash_join_after_projection() -> Result<()> {
11581158
&JoinType::Inner,
11591159
None,
11601160
PartitionMode::Auto,
1161-
true,
1161+
NullEquality::NullEqualsNull,
11621162
)?);
11631163
let projection = Arc::new(ProjectionExec::try_new(
11641164
vec![

datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use arrow::compute::SortOptions;
3030
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3131
use arrow::record_batch::RecordBatch;
3232
use datafusion_common::tree_node::{TransformedResult, TreeNode};
33-
use datafusion_common::{assert_contains, Result};
33+
use datafusion_common::{assert_contains, NullEquality, Result};
3434
use datafusion_common::config::ConfigOptions;
3535
use datafusion_datasource::source::DataSourceExec;
3636
use datafusion_execution::TaskContext;
@@ -1171,7 +1171,7 @@ fn hash_join_exec(
11711171
&JoinType::Inner,
11721172
None,
11731173
PartitionMode::Partitioned,
1174-
false,
1174+
NullEquality::NullEqualsNothing,
11751175
)
11761176
.unwrap(),
11771177
)

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion_common::config::ConfigOptions;
3333
use datafusion_common::stats::Precision;
3434
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3535
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
36-
use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics};
36+
use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result, Statistics};
3737
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3838
use datafusion_execution::object_store::ObjectStoreUrl;
3939
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -190,7 +190,7 @@ pub fn sort_merge_join_exec(
190190
None,
191191
*join_type,
192192
vec![SortOptions::default(); join_on.len()],
193-
false,
193+
NullEquality::NullEqualsNothing,
194194
)
195195
.unwrap(),
196196
)
@@ -236,7 +236,7 @@ pub fn hash_join_exec(
236236
join_type,
237237
None,
238238
PartitionMode::Partitioned,
239-
true,
239+
NullEquality::NullEqualsNull,
240240
)?))
241241
}
242242

0 commit comments

Comments
 (0)