19
19
20
20
use crate :: optimizer:: ApplyOrder ;
21
21
use crate :: { OptimizerConfig , OptimizerRule } ;
22
- use datafusion_common:: Result ;
22
+ use datafusion_common:: tree_node:: Transformed ;
23
+ use datafusion_common:: { internal_err, Result } ;
23
24
use datafusion_expr:: {
24
25
and, logical_plan:: Filter , logical_plan:: JoinType , Expr , ExprSchemable , LogicalPlan ,
25
26
} ;
27
+ use std:: mem;
26
28
use std:: sync:: Arc ;
27
29
28
30
/// The FilterNullJoinKeys rule will identify inner joins with equi-join conditions
@@ -32,66 +34,69 @@ use std::sync::Arc;
32
34
#[ derive( Default ) ]
33
35
pub struct FilterNullJoinKeys { }
34
36
35
- impl FilterNullJoinKeys {
36
- pub const NAME : & ' static str = "filter_null_join_keys" ;
37
- }
38
-
39
37
impl OptimizerRule for FilterNullJoinKeys {
40
38
fn try_optimize (
41
39
& self ,
42
- plan : & LogicalPlan ,
43
- config : & dyn OptimizerConfig ,
40
+ _plan : & LogicalPlan ,
41
+ _config : & dyn OptimizerConfig ,
44
42
) -> Result < Option < LogicalPlan > > {
43
+ internal_err ! ( "Should have called FilterNullJoinKeys::rewrite" )
44
+ }
45
+
46
+ fn supports_rewrite ( & self ) -> bool {
47
+ true
48
+ }
49
+
50
+ fn apply_order ( & self ) -> Option < ApplyOrder > {
51
+ Some ( ApplyOrder :: BottomUp )
52
+ }
53
+
54
+ fn rewrite (
55
+ & self ,
56
+ plan : LogicalPlan ,
57
+ config : & dyn OptimizerConfig ,
58
+ ) -> Result < Transformed < LogicalPlan > > {
45
59
if !config. options ( ) . optimizer . filter_null_join_keys {
46
- return Ok ( None ) ;
60
+ return Ok ( Transformed :: no ( plan ) ) ;
47
61
}
48
62
49
63
match plan {
50
- LogicalPlan :: Join ( join) if join. join_type == JoinType :: Inner => {
51
- let mut join = join. clone ( ) ;
52
-
64
+ LogicalPlan :: Join ( mut join) if join. join_type == JoinType :: Inner => {
53
65
let left_schema = join. left . schema ( ) ;
54
66
let right_schema = join. right . schema ( ) ;
55
67
56
68
let mut left_filters = vec ! [ ] ;
57
69
let mut right_filters = vec ! [ ] ;
58
70
59
- for ( l, r) in & join. on {
71
+ for ( l, r) in & mut join. on {
60
72
if l. nullable ( left_schema) ? {
61
- left_filters. push ( l . clone ( ) ) ;
73
+ left_filters. push ( mem :: take ( l ) ) ;
62
74
}
63
75
64
76
if r. nullable ( right_schema) ? {
65
- right_filters. push ( r . clone ( ) ) ;
77
+ right_filters. push ( mem :: take ( r ) ) ;
66
78
}
67
79
}
68
80
69
81
if !left_filters. is_empty ( ) {
70
82
let predicate = create_not_null_predicate ( left_filters) ;
71
83
join. left = Arc :: new ( LogicalPlan :: Filter ( Filter :: try_new (
72
- predicate,
73
- join. left . clone ( ) ,
84
+ predicate, join. left ,
74
85
) ?) ) ;
75
86
}
76
87
if !right_filters. is_empty ( ) {
77
88
let predicate = create_not_null_predicate ( right_filters) ;
78
89
join. right = Arc :: new ( LogicalPlan :: Filter ( Filter :: try_new (
79
- predicate,
80
- join. right . clone ( ) ,
90
+ predicate, join. right ,
81
91
) ?) ) ;
82
92
}
83
- Ok ( Some ( LogicalPlan :: Join ( join) ) )
93
+ Ok ( Transformed :: yes ( LogicalPlan :: Join ( join) ) )
84
94
}
85
- _ => Ok ( None ) ,
95
+ _ => Ok ( Transformed :: no ( plan ) ) ,
86
96
}
87
97
}
88
-
89
98
fn name ( & self ) -> & str {
90
- Self :: NAME
91
- }
92
-
93
- fn apply_order ( & self ) -> Option < ApplyOrder > {
94
- Some ( ApplyOrder :: BottomUp )
99
+ "filter_null_join_keys"
95
100
}
96
101
}
97
102
0 commit comments