Skip to content

Commit b94fb28

Browse files
nsycahvanhovell
authored andcommitted
[SPARK-19017][SQL] NOT IN subquery with more than one column may return incorrect results
## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the NULL-aware expression of a NOT IN query is expanded in Rule `RewritePredicateSubquery`. Example: The query select a1,b1 from t1 where (a1,b1) not in (select a2,b2 from t2); has the (a1, b1) = (a2, b2) rewritten from (before this fix): Join LeftAnti, ((isnull((_1#2 = a2#16)) || isnull((_2#3 = b2#17))) || ((_1#2 = a2#16) && (_2#3 = b2#17))) to (after this fix): Join LeftAnti, (((_1#2 = a2#16) || isnull((_1#2 = a2#16))) && ((_2#3 = b2#17) || isnull((_2#3 = b2#17)))) ## How was this patch tested? sql/test, catalyst/test and new test cases in SQLQueryTestSuite. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #16467 from nsyca/19017. (cherry picked from commit cdb691e) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
1 parent d128b6a commit b94fb28

File tree

5 files changed

+131
-6
lines changed

5 files changed

+131
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,14 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
6868
// Note that will almost certainly be planned as a Broadcast Nested Loop join.
6969
// Use EXISTS if performance matters to you.
7070
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
71-
val anyNull = splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or)
72-
Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get)))
71+
// Expand the NOT IN expression with the NULL-aware semantic
72+
// to its full form. That is from:
73+
// (a1,b1,...) = (a2,b2,...)
74+
// to
75+
// (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ...
76+
val joinConds = splitConjunctivePredicates(joinCond.get)
77+
val pairs = joinConds.map(c => Or(c, IsNull(c))).reduceLeft(And)
78+
Join(outerPlan, sub, LeftAnti, Option(pairs))
7379
case (p, predicate) =>
7480
val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
7581
Project(p.output, Filter(newCond.get, inputPlan))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
-- This file contains test cases for NOT IN subquery with multiple columns.
2+
3+
-- The data sets are populated as follows:
4+
-- 1) When T1.A1 = T2.A2
5+
-- 1.1) T1.B1 = T2.B2
6+
-- 1.2) T1.B1 = T2.B2 returns false
7+
-- 1.3) T1.B1 is null
8+
-- 1.4) T2.B2 is null
9+
-- 2) When T1.A1 = T2.A2 returns false
10+
-- 3) When T1.A1 is null
11+
-- 4) When T1.A2 is null
12+
13+
-- T1.A1 T1.B1 T2.A2 T2.B2
14+
-- ----- ----- ----- -----
15+
-- 1 1 1 1 (1.1)
16+
-- 1 3 (1.2)
17+
-- 1 null 1 null (1.3 & 1.4)
18+
--
19+
-- 2 1 1 1 (2)
20+
-- null 1 (3)
21+
-- null 3 (4)
22+
23+
create temporary view t1 as select * from values
24+
(1, 1), (2, 1), (null, 1),
25+
(1, 3), (null, 3),
26+
(1, null), (null, 2)
27+
as t1(a1, b1);
28+
29+
create temporary view t2 as select * from values
30+
(1, 1),
31+
(null, 3),
32+
(1, null)
33+
as t2(a2, b2);
34+
35+
-- multiple columns in NOT IN
36+
-- TC 01.01
37+
select a1,b1
38+
from t1
39+
where (a1,b1) not in (select a2,b2
40+
from t2);
41+
42+
-- multiple columns with expressions in NOT IN
43+
-- TC 01.02
44+
select a1,b1
45+
from t1
46+
where (a1-1,b1) not in (select a2,b2
47+
from t2);
48+
49+
-- multiple columns with expressions in NOT IN
50+
-- TC 01.02
51+
select a1,b1
52+
from t1
53+
where (a1,b1) not in (select a2+1,b2
54+
from t2);
55+
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
-- Automatically generated by SQLQueryTestSuite
2+
-- Number of queries: 5
3+
4+
5+
-- !query 0
6+
create temporary view t1 as select * from values
7+
(1, 1), (2, 1), (null, 1),
8+
(1, 3), (null, 3),
9+
(1, null), (null, 2)
10+
as t1(a1, b1)
11+
-- !query 0 schema
12+
struct<>
13+
-- !query 0 output
14+
15+
16+
17+
-- !query 1
18+
create temporary view t2 as select * from values
19+
(1, 1),
20+
(null, 3),
21+
(1, null)
22+
as t2(a2, b2)
23+
-- !query 1 schema
24+
struct<>
25+
-- !query 1 output
26+
27+
28+
29+
-- !query 2
30+
select a1,b1
31+
from t1
32+
where (a1,b1) not in (select a2,b2
33+
from t2)
34+
-- !query 2 schema
35+
struct<a1:int,b1:int>
36+
-- !query 2 output
37+
2 1
38+
39+
40+
-- !query 3
41+
select a1,b1
42+
from t1
43+
where (a1-1,b1) not in (select a2,b2
44+
from t2)
45+
-- !query 3 schema
46+
struct<a1:int,b1:int>
47+
-- !query 3 output
48+
1 1
49+
50+
51+
-- !query 4
52+
select a1,b1
53+
from t1
54+
where (a1,b1) not in (select a2+1,b2
55+
from t2)
56+
-- !query 4 schema
57+
struct<a1:int,b1:int>
58+
-- !query 4 output
59+
1 1

sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
163163
s"-- Number of queries: ${outputs.size}\n\n\n" +
164164
outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n"
165165
}
166-
stringToFile(new File(testCase.resultFile), goldenOutput)
166+
val resultFile = new File(testCase.resultFile);
167+
val parent = resultFile.getParentFile();
168+
if (!parent.exists()) {
169+
assert(parent.mkdirs(), "Could not create directory: " + parent)
170+
}
171+
stringToFile(resultFile, goldenOutput)
167172
}
168173

169174
// Read back the golden file.

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,12 +263,12 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
263263
Row(1, 2.0) :: Row(1, 2.0) :: Nil)
264264

265265
checkAnswer(
266-
sql("select * from l where a not in (select c from t where b < d)"),
267-
Row(1, 2.0) :: Row(1, 2.0) :: Row(3, 3.0) :: Nil)
266+
sql("select * from l where (a, b) not in (select c, d from t) and a < 4"),
267+
Row(1, 2.0) :: Row(1, 2.0) :: Row(2, 1.0) :: Row(2, 1.0) :: Row(3, 3.0) :: Nil)
268268

269269
// Empty sub-query
270270
checkAnswer(
271-
sql("select * from l where a not in (select c from r where c > 10 and b < d)"),
271+
sql("select * from l where (a, b) not in (select c, d from r where c > 10)"),
272272
Row(1, 2.0) :: Row(1, 2.0) :: Row(2, 1.0) :: Row(2, 1.0) ::
273273
Row(3, 3.0) :: Row(null, null) :: Row(null, 5.0) :: Row(6, null) :: Nil)
274274

0 commit comments

Comments
 (0)