File tree Expand file tree Collapse file tree 2 files changed +29
-4
lines changed
main/scala/org/apache/spark/sql/execution/command
test/scala/org/apache/spark/sql Expand file tree Collapse file tree 2 files changed +29
-4
lines changed Original file line number Diff line number Diff line change @@ -195,16 +195,21 @@ case class AlterTableRenameCommand(
195
195
DDLUtils .verifyAlterTableType(catalog, table, isView)
196
196
// If an exception is thrown here we can just assume the table is uncached;
197
197
// this can happen with Hive tables when the underlying catalog is in-memory.
198
- val wasCached = Try (sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false )
199
- if (wasCached) {
198
+ // If `optStorageLevel` is defined, the old table was cached.
199
+ val optStorageLevel = Try {
200
+ val optCachedData = sparkSession.sharedState.cacheManager.lookupCachedData(
201
+ sparkSession.table(oldName.unquotedString))
202
+ optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
203
+ }.getOrElse(None )
204
+ optStorageLevel.foreach { _ =>
200
205
CommandUtils .uncacheTableOrView(sparkSession, oldName.unquotedString)
201
206
}
202
207
// Invalidate the table last, otherwise uncaching the table would load the logical plan
203
208
// back into the hive metastore cache
204
209
catalog.refreshTable(oldName)
205
210
catalog.renameTable(oldName, newName)
206
- if (wasCached) {
207
- sparkSession.catalog.cacheTable(newName.unquotedString)
211
+ optStorageLevel.foreach { storageLevel =>
212
+ sparkSession.catalog.cacheTable(newName.unquotedString, storageLevel )
208
213
}
209
214
}
210
215
Seq .empty[Row ]
Original file line number Diff line number Diff line change @@ -1285,4 +1285,24 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
1285
1285
assert(spark.sharedState.cacheManager.lookupCachedData(sql(" select 1, 2" )).isDefined)
1286
1286
}
1287
1287
}
1288
+
1289
+ test(" SPARK-33786: Cache's storage level should be respected when a table name is altered." ) {
1290
+ withTable(" old" , " new" ) {
1291
+ withTempPath { path =>
1292
+ def getStorageLevel (tableName : String ): StorageLevel = {
1293
+ val table = spark.table(tableName)
1294
+ val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
1295
+ cachedData.cachedRepresentation.cacheBuilder.storageLevel
1296
+ }
1297
+ Seq (1 -> " a" ).toDF(" i" , " j" ).write.parquet(path.getCanonicalPath)
1298
+ sql(s " CREATE TABLE old USING parquet LOCATION ' ${path.toURI}' " )
1299
+ sql(" CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')" )
1300
+ val oldStorageLevel = getStorageLevel(" old" )
1301
+
1302
+ sql(" ALTER TABLE old RENAME TO new" )
1303
+ val newStorageLevel = getStorageLevel(" new" )
1304
+ assert(oldStorageLevel === newStorageLevel)
1305
+ }
1306
+ }
1307
+ }
1288
1308
}
You can’t perform that action at this time.
0 commit comments