Skip to content

[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables #16500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ case class LoadDataCommand(
holdDDLTime = false,
isSrcLocal = isLocal)
}

// Refresh the metadata cache to ensure the data visible to the users
catalog.refreshTable(targetTable.identifier)

Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val logicalRelation = cached.getOrElse {
val sizeInBytes =
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
val fileCatalog = {
val catalog = new CatalogFileIndex(
val fileIndex = {
val index = new CatalogFileIndex(
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
index
} else {
catalog.filterPartitions(Nil) // materialize all the partitions in memory
index.filterPartitions(Nil) // materialize all the partitions in memory
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
Expand All @@ -261,7 +261,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
.filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))

val relation = HadoopFsRelation(
location = fileCatalog,
location = fileIndex,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
"normal_parquet",
"jt",
"jt_array",
"test_parquet")
"test_parquet")
super.afterAll()
}

test(s"conversion is working") {
Expand Down Expand Up @@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0)))

// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0)))

// Add data files to partition directory and check whether they can be read
sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))

// Check it with pruning predicates
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 0"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0)))
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 1"),
Seq(("baz", 1)).toDF("a", "b"))
Seq(Row("baz", 1)))
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 2"),
Seq[(String, Int)]().toDF("a", "b"))
Seq.empty)

// Also verify the inputFiles implementation
assert(sql("select * from test_added_partitions").inputFiles.length == 2)
Expand All @@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}

test("Explicitly added partitions should be readable after load") {
withTable("test_added_partitions") {
withTempDir { src =>
val newPartitionDir = src.getCanonicalPath
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
.mode("overwrite")
.parquet(newPartitionDir)

sql(
"""
|CREATE TABLE test_added_partitions (a STRING)
|PARTITIONED BY (b INT)
|STORED AS PARQUET
""".stripMargin)

// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
// This table fetch is to fill the cache with zero leaf files
checkAnswer(spark.table("test_added_partitions"), Seq.empty)

sql(
s"""
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
|INTO TABLE test_added_partitions PARTITION(b='1')
""".stripMargin)

checkAnswer(
spark.table("test_added_partitions"),
Seq(Row("0", 1), Row("1", 1)))
}
}
}

test("Non-partitioned table readable after load") {
withTable("tab") {
withTempDir { src =>
val newPartitionDir = src.getCanonicalPath
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
.mode("overwrite")
.parquet(newPartitionDir)

sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

// This table fetch is to fill the cache with zero leaf files
checkAnswer(spark.table("tab"), Seq.empty)

sql(
s"""
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
|INTO TABLE tab
""".stripMargin)

checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
}
}
}

test("self-join") {
val table = spark.table("normal_parquet")
val selfJoin = table.as("t1").crossJoin(table.as("t2"))
Expand Down