Skip to content

Commit 367df92

Browse files
committed
Recache data in the command of REFRESH TABLE.
1 parent 0424da6 commit 367df92

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,23 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
347347
extends RunnableCommand {
348348

349349
override def run(sqlContext: SQLContext): Seq[Row] = {
350+
// Refresh the given table's metadata first.
350351
sqlContext.catalog.refreshTable(databaseName, tableName)
352+
353+
// If this table is cached as a InMemoryColumnarRelation, drop the original
354+
// cached version and make the new version cached lazily.
355+
val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
356+
// Use lookupCachedData directly since RefreshTable also takes databaseName.
357+
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
358+
if (isCached) {
359+
// Create a data frame to represent the table.
360+
val df = DataFrame(sqlContext, logicalPlan)
361+
// Uncache the logicalPlan.
362+
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
363+
// Cache it again.
364+
sqlContext.cacheManager.cacheQuery(df, Some(tableName))
365+
}
366+
351367
Seq.empty[Row]
352368
}
353369
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import java.io.File
21+
2022
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
2123
import org.apache.spark.sql.hive.test.TestHive
2224
import org.apache.spark.sql.hive.test.TestHive._
23-
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
25+
import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
2426
import org.apache.spark.storage.RDDBlockId
27+
import org.apache.spark.util.Utils
2528

2629
class CachedTableSuite extends QueryTest {
2730

@@ -155,4 +158,48 @@ class CachedTableSuite extends QueryTest {
155158
assertCached(table("udfTest"))
156159
uncacheTable("udfTest")
157160
}
161+
162+
test("REFRESH TABLE also needs to recache the data") {
163+
val tempPath: File = Utils.createTempDir()
164+
tempPath.delete()
165+
table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
166+
createExternalTable("refreshTable", tempPath.toString, "parquet")
167+
checkAnswer(
168+
table("refreshTable"),
169+
table("src").collect())
170+
// Cache the table.
171+
sql("CACHE TABLE refreshTable")
172+
assertCached(table("refreshTable"))
173+
// Append new data.
174+
table("src").save(tempPath.toString, "parquet", SaveMode.Append)
175+
// We are still using the old data.
176+
assertCached(table("refreshTable"))
177+
checkAnswer(
178+
table("refreshTable"),
179+
table("src").collect())
180+
// Refresh the table.
181+
sql("REFRESH TABLE refreshTable")
182+
// We are using the new data.
183+
assertCached(table("refreshTable"))
184+
checkAnswer(
185+
table("refreshTable"),
186+
table("src").unionAll(table("src")).collect())
187+
188+
// Drop the table and create it again.
189+
sql("DROP TABLE refreshTable")
190+
createExternalTable("refreshTable", tempPath.toString, "parquet")
191+
// It is not cached.
192+
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
193+
// Refresh the table. REFRESH TABLE command should not make a uncached
194+
// table cached.
195+
sql("REFRESH TABLE refreshTable")
196+
checkAnswer(
197+
table("refreshTable"),
198+
table("src").unionAll(table("src")).collect())
199+
// It is not cached.
200+
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
201+
202+
sql("DROP TABLE refreshTable")
203+
Utils.deleteRecursively(tempPath)
204+
}
158205
}

0 commit comments

Comments
 (0)