@@ -74,12 +74,16 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
7474}
7575
7676
77- case class Filter (condition : Expression , child : SparkPlan ) extends UnaryNode with CodegenSupport {
77+ case class Filter (condition : Expression , child : SparkPlan )
78+ extends UnaryNode with CodegenSupport with PredicateHelper {
7879 override def output : Seq [Attribute ] = child.output
7980
8081 private [sql] override lazy val metrics = Map (
8182 " numOutputRows" -> SQLMetrics .createLongMetric(sparkContext, " number of output rows" ))
8283
84+ override protected def validConstraints : Set [Expression ] =
85+ child.constraints.union(splitConjunctivePredicates(condition).toSet)
86+
8387 override def upstreams (): Seq [RDD [InternalRow ]] = {
8488 child.asInstanceOf [CodegenSupport ].upstreams()
8589 }
@@ -90,20 +94,53 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
9094
9195 override def doConsume (ctx : CodegenContext , input : Seq [ExprCode ]): String = {
9296 val numOutput = metricTerm(ctx, " numOutputRows" )
93- val expr = ExpressionCanonicalizer .execute(
94- BindReferences .bindReference(condition, child.output))
95- ctx.currentVars = input
96- val eval = expr.gen(ctx)
97- val nullCheck = if (expr.nullable) {
98- s " ! ${eval.isNull} && "
99- } else {
100- s " "
97+
98+ // Find out the columns that is nullable and all the `null` will be filtered out by condition,
99+ // this help to simplify the generated code for condition.
100+ val filterNullColumns = child.output.filter { case e =>
101+ e.nullable && constraints.contains(IsNotNull (e)) && ! child.constraints.contains(IsNotNull (e))
102+ }
103+ val filterOutNullColumn = filterNullColumns.map { a =>
104+ val i = child.output.indexOf(a)
105+ s " if ( ${input(i).isNull}) continue; "
106+ }.mkString(" \n " )
107+
108+ // Change the nullability for those columns to simplify generated code of condition
109+ val newInput = child.output.map { a =>
110+ if (filterNullColumns.contains(a)) {
111+ a.withNullability(false )
112+ } else {
113+ a
114+ }
101115 }
116+
117+ // Split the conjunctive predicates, so we can check them one by one.
118+ // Also filter out the predicate of `IsNotNull`
119+ val conjPredicates = splitConjunctivePredicates(condition).filter {
120+ case IsNotNull (a) if filterNullColumns.contains(a) => false
121+ case _ => true
122+ }
123+ ctx.currentVars = input
124+ val predicates = conjPredicates.map { e =>
125+ val bound = ExpressionCanonicalizer .execute(
126+ BindReferences .bindReference(e, newInput))
127+ val ev = bound.gen(ctx)
128+ val nullCheck = if (bound.nullable) {
129+ s " ${ev.isNull} || "
130+ } else {
131+ s " "
132+ }
133+ s """
134+ | ${ev.code}
135+ |if ( ${nullCheck}! ${ev.value}) continue;
136+ """ .stripMargin
137+ }.mkString(" \n " )
138+
102139 s """
103- | ${eval.code}
104- |if (!( $nullCheck ${eval.value} )) continue;
140+ | $filterOutNullColumn
141+ | $predicates
105142 | $numOutput.add(1);
106- | ${consume(ctx, ctx.currentVars )}
143+ | ${consume(ctx, input )}
107144 """ .stripMargin
108145 }
109146
0 commit comments