Skip to content

Commit f3709a0

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-33786][SQL] The storage level for a cache should be respected when a table name is altered
### What changes were proposed in this pull request? This PR proposes to retain the cache's storage level when a table name is altered by `ALTER TABLE ... RENAME TO ...`. ### Why are the changes needed? Currently, when a table name is altered, the table's cache is refreshed (if exists), but the storage level is not retained. For example: ```scala def getStorageLevel(tableName: String): StorageLevel = { val table = spark.table(tableName) val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get cachedData.cachedRepresentation.cacheBuilder.storageLevel } Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath) sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'") sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')") val oldStorageLevel = getStorageLevel("old") sql("ALTER TABLE old RENAME TO new") val newStorageLevel = getStorageLevel("new") ``` `oldStorageLevel` will be `StorageLevel(memory, deserialized, 1 replicas)` whereas `newStorageLevel` will be `StorageLevel(disk, memory, deserialized, 1 replicas)`, which is the default storage level. ### Does this PR introduce _any_ user-facing change? Yes, now the storage level for the cache will be retained. ### How was this patch tested? Added a unit test. Closes #30774 from imback82/alter_table_rename_cache_fix. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ef7f690) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 50e72dc commit f3709a0

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.net.{URI, URISyntaxException}
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable.ArrayBuffer
24-
import scala.util.Try
2524
import scala.util.control.NonFatal
2625

2726
import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
@@ -193,18 +192,19 @@ case class AlterTableRenameCommand(
193192
} else {
194193
val table = catalog.getTableMetadata(oldName)
195194
DDLUtils.verifyAlterTableType(catalog, table, isView)
196-
// If an exception is thrown here we can just assume the table is uncached;
197-
// this can happen with Hive tables when the underlying catalog is in-memory.
198-
val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
199-
if (wasCached) {
195+
// If `optStorageLevel` is defined, the old table was cached.
196+
val optCachedData = sparkSession.sharedState.cacheManager.lookupCachedData(
197+
sparkSession.table(oldName.unquotedString))
198+
val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
199+
if (optStorageLevel.isDefined) {
200200
CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString)
201201
}
202202
// Invalidate the table last, otherwise uncaching the table would load the logical plan
203203
// back into the hive metastore cache
204204
catalog.refreshTable(oldName)
205205
catalog.renameTable(oldName, newName)
206-
if (wasCached) {
207-
sparkSession.catalog.cacheTable(newName.unquotedString)
206+
optStorageLevel.foreach { storageLevel =>
207+
sparkSession.catalog.cacheTable(newName.unquotedString, storageLevel)
208208
}
209209
}
210210
Seq.empty[Row]

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,4 +1285,24 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
12851285
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
12861286
}
12871287
}
1288+
1289+
test("SPARK-33786: Cache's storage level should be respected when a table name is altered.") {
1290+
withTable("old", "new") {
1291+
withTempPath { path =>
1292+
def getStorageLevel(tableName: String): StorageLevel = {
1293+
val table = spark.table(tableName)
1294+
val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
1295+
cachedData.cachedRepresentation.cacheBuilder.storageLevel
1296+
}
1297+
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
1298+
sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'")
1299+
sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')")
1300+
val oldStorageLevel = getStorageLevel("old")
1301+
1302+
sql("ALTER TABLE old RENAME TO new")
1303+
val newStorageLevel = getStorageLevel("new")
1304+
assert(oldStorageLevel === newStorageLevel)
1305+
}
1306+
}
1307+
}
12881308
}

0 commit comments

Comments
 (0)