Skip to content

Commit 767f582

Browse files
committed
[SPARK-6851][SQL] Create new instance for each converted parquet relation
Otherwise we end up rewriting predicates to be trivially equal (i.e. `a#1 = a#2` -> `a#3 = a#3`), at which point the query is no longer valid. Author: Michael Armbrust <michael@databricks.com> Closes #5458 from marmbrus/selfJoinParquet and squashes the following commits: 22df77c [Michael Armbrust] [SPARK-6851][SQL] Create new instance for each converted parquet relation (cherry picked from commit 23d5f88) Signed-off-by: Michael Armbrust <michael@databricks.com> Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
1 parent 48321b8 commit 767f582

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
279279
}
280280
}
281281

282-
if (metastoreRelation.hiveQlTable.isPartitioned) {
282+
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
283283
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
284284
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
285285
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
@@ -314,6 +314,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
314314

315315
parquetRelation
316316
}
317+
318+
result.newInstance()
317319
}
318320

319321
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,90 @@ case class Nested3(f3: Int)
3434
case class NestedArray2(b: Seq[Int])
3535
case class NestedArray1(a: NestedArray2)
3636

37+
case class Order(
38+
id: Int,
39+
make: String,
40+
`type`: String,
41+
price: Int,
42+
pdate: String,
43+
customer: String,
44+
city: String,
45+
state: String,
46+
month: Int)
47+
3748
/**
3849
* A collection of hive query tests where we generate the answers ourselves instead of depending on
3950
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
4051
* valid, but Hive currently cannot execute it.
4152
*/
4253
class SQLQuerySuite extends QueryTest {
4354

55+
test("SPARK-6851: Self-joined converted parquet tables") {
56+
val orders = Seq(
57+
Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
58+
Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
59+
Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
60+
Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
61+
Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
62+
Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
63+
Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
64+
Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
65+
Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
66+
Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
67+
Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))
68+
69+
val orderUpdates = Seq(
70+
Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
71+
Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))
72+
73+
orders.toDF.registerTempTable("orders1")
74+
orderUpdates.toDF.registerTempTable("orderupdates1")
75+
76+
sql(
77+
"""CREATE TABLE orders(
78+
| id INT,
79+
| make String,
80+
| type String,
81+
| price INT,
82+
| pdate String,
83+
| customer String,
84+
| city String)
85+
|PARTITIONED BY (state STRING, month INT)
86+
|STORED AS PARQUET
87+
""".stripMargin)
88+
89+
sql(
90+
"""CREATE TABLE orderupdates(
91+
| id INT,
92+
| make String,
93+
| type String,
94+
| price INT,
95+
| pdate String,
96+
| customer String,
97+
| city String)
98+
|PARTITIONED BY (state STRING, month INT)
99+
|STORED AS PARQUET
100+
""".stripMargin)
101+
102+
sql("set hive.exec.dynamic.partition.mode=nonstrict")
103+
sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
104+
sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
105+
106+
checkAnswer(
107+
sql(
108+
"""
109+
|select orders.state, orders.month
110+
|from orders
111+
|join (
112+
| select distinct orders.state,orders.month
113+
| from orders
114+
| join orderupdates
115+
| on orderupdates.id = orders.id) ao
116+
| on ao.state = orders.state and ao.month = orders.month
117+
""".stripMargin),
118+
(1 to 6).map(_ => Row("CA", 20151)))
119+
}
120+
44121
test("SPARK-5371: union with null and sum") {
45122
val df = Seq((1, 1)).toDF("c1", "c2")
46123
df.registerTempTable("table1")

0 commit comments

Comments
 (0)