Skip to content

Commit c3a6269

Browse files
sameeragarwalrxin
authored andcommitted
[SPARK-13789] Infer additional constraints from attribute equality
## What changes were proposed in this pull request? This PR adds support for inferring an additional set of data constraints based on attribute equality. For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), we can now automatically infer an additional constraint of the form `b = 5` ## How was this patch tested? Tested that new constraints are properly inferred for filters (by adding a new test) and equi-joins (by modifying an existing test) Author: Sameer Agarwal <sameer@databricks.com> Closes #11618 from sameeragarwal/infer-isequal-constraints.
1 parent 416e71a commit c3a6269

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
3232
*/
3333
protected def getRelevantConstraints(constraints: Set[Expression]): Set[Expression] = {
3434
constraints
35+
.union(inferAdditionalConstraints(constraints))
3536
.union(constructIsNotNullConstraints(constraints))
3637
.filter(constraint =>
3738
constraint.references.nonEmpty && constraint.references.subsetOf(outputSet))
@@ -63,6 +64,26 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
6364
}.foldLeft(Set.empty[Expression])(_ union _.toSet)
6465
}
6566

67+
/**
68+
* Infers an additional set of constraints from a given set of equality constraints.
69+
* For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
70+
* additional constraint of the form `b = 5`
71+
*/
72+
private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
73+
var inferredConstraints = Set.empty[Expression]
74+
constraints.foreach {
75+
case eq @ EqualTo(l: Attribute, r: Attribute) =>
76+
inferredConstraints ++= (constraints - eq).map(_ transform {
77+
case a: Attribute if a.semanticEquals(l) => r
78+
})
79+
inferredConstraints ++= (constraints - eq).map(_ transform {
80+
case a: Attribute if a.semanticEquals(r) => l
81+
})
82+
case _ => // No inference
83+
}
84+
inferredConstraints -- constraints
85+
}
86+
6687
/**
6788
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
6889
* example, if this set contains the expression `a = 2` then that expression is guaranteed to

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ class ConstraintPropagationSuite extends SparkFunSuite {
158158
tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100,
159159
tr1.resolveQuoted("a", caseInsensitiveResolution).get ===
160160
tr2.resolveQuoted("a", caseInsensitiveResolution).get,
161+
tr2.resolveQuoted("a", caseInsensitiveResolution).get > 10,
161162
IsNotNull(tr2.resolveQuoted("a", caseInsensitiveResolution).get),
162163
IsNotNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get),
163164
IsNotNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get))))
@@ -203,4 +204,17 @@ class ConstraintPropagationSuite extends SparkFunSuite {
203204
.join(tr2.where('d.attr < 100), FullOuter, Some("tr1.a".attr === "tr2.a".attr))
204205
.analyze.constraints.isEmpty)
205206
}
207+
208+
test("infer additional constraints in filters") {
209+
val tr = LocalRelation('a.int, 'b.int, 'c.int)
210+
211+
verifyConstraints(tr
212+
.where('a.attr > 10 && 'a.attr === 'b.attr)
213+
.analyze.constraints,
214+
ExpressionSet(Seq(resolveColumn(tr, "a") > 10,
215+
resolveColumn(tr, "b") > 10,
216+
resolveColumn(tr, "a") === resolveColumn(tr, "b"),
217+
IsNotNull(resolveColumn(tr, "a")),
218+
IsNotNull(resolveColumn(tr, "b")))))
219+
}
206220
}

0 commit comments

Comments
 (0)