Skip to content

Commit 9d87ed1

Browse files
committed
Use analyzer (which runs to fixed point) instead of manually removing analysis operators.
1 parent 221909e commit 9d87ed1

File tree

3 files changed

+29
-30
lines changed

3 files changed

+29
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
186186

187187
/** Caches the specified table in-memory. */
188188
def cacheTable(tableName: String): Unit = {
189-
val currentTable = catalog.lookupRelation(None, tableName)
190-
val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match {
189+
val currentTable = table(tableName).queryExecution.analyzed
190+
val asInMemoryRelation = currentTable match {
191191
case _: InMemoryRelation =>
192192
currentTable.logicalPlan
193193

@@ -202,7 +202,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
202202

203203
/** Removes the specified table from the in-memory cache. */
204204
def uncacheTable(tableName: String): Unit = {
205-
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
205+
table(tableName).queryExecution.analyzed match {
206206
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
207207
// we reregister the RDD as a table.
208208
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
@@ -218,8 +218,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
218218

219219
/** Returns true if the table is currently cached in-memory. */
220220
def isCached(tableName: String): Boolean = {
221-
val relation = catalog.lookupRelation(None, tableName)
222-
EliminateAnalysisOperators(relation) match {
221+
val relation = table(tableName).queryExecution.analyzed
222+
relation match {
223223
case _: InMemoryRelation => true
224224
case _ => false
225225
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,31 @@ package org.apache.spark.sql
2020
import org.apache.spark.sql.TestData._
2121
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
2222
import org.apache.spark.sql.test.TestSQLContext
23+
import org.apache.spark.sql.test.TestSQLContext._
2324

2425
class CachedTableSuite extends QueryTest {
2526
TestData // Load test tables.
2627

27-
// NOTE: ALL TESTS ARE IGNORED PENDING SPARK-2264
28+
test("SPARK-1669: cacheTable should be idempotent") {
29+
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
2830

29-
ignore("read from cached table and uncache") {
31+
cacheTable("testData")
32+
table("testData").queryExecution.analyzed match {
33+
case _: InMemoryRelation =>
34+
case _ =>
35+
fail("testData should be cached")
36+
}
37+
38+
cacheTable("testData")
39+
table("testData").queryExecution.analyzed match {
40+
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
41+
fail("cacheTable is not idempotent")
42+
43+
case _ =>
44+
}
45+
}
46+
47+
test("read from cached table and uncache") {
3048
TestSQLContext.cacheTable("testData")
3149

3250
checkAnswer(
@@ -53,20 +71,20 @@ class CachedTableSuite extends QueryTest {
5371
}
5472
}
5573

56-
ignore("correct error on uncache of non-cached table") {
74+
test("correct error on uncache of non-cached table") {
5775
intercept[IllegalArgumentException] {
5876
TestSQLContext.uncacheTable("testData")
5977
}
6078
}
6179

62-
ignore("SELECT Star Cached Table") {
80+
test("SELECT Star Cached Table") {
6381
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
6482
TestSQLContext.cacheTable("selectStar")
6583
TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
6684
TestSQLContext.uncacheTable("selectStar")
6785
}
6886

69-
ignore("Self-join cached") {
87+
test("Self-join cached") {
7088
val unCachedAnswer =
7189
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
7290
TestSQLContext.cacheTable("testData")
@@ -76,7 +94,7 @@ class CachedTableSuite extends QueryTest {
7694
TestSQLContext.uncacheTable("testData")
7795
}
7896

79-
ignore("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
97+
test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
8098
TestSQLContext.sql("CACHE TABLE testData")
8199
TestSQLContext.table("testData").queryExecution.executedPlan match {
82100
case _: InMemoryColumnarTableScan => // Found evidence of caching

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -406,23 +406,4 @@ class SQLQuerySuite extends QueryTest {
406406
)
407407
clear()
408408
}
409-
410-
test("SPARK-1669: cacheTable should be idempotent") {
411-
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
412-
413-
cacheTable("testData")
414-
EliminateAnalysisOperators(table("testData").logicalPlan) match {
415-
case _: InMemoryRelation =>
416-
case _ =>
417-
fail("testData should be cached")
418-
}
419-
420-
cacheTable("testData")
421-
EliminateAnalysisOperators(table("testData").logicalPlan) match {
422-
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
423-
fail("cacheTable is not idempotent")
424-
425-
case _ =>
426-
}
427-
}
428409
}

0 commit comments

Comments
 (0)