Skip to content

Commit e788897

Browse files
gatorsmilecmonkey
authored andcommitted
[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables
### What changes were proposed in this pull request? ```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. ### How was this patch tested? Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16500 from gatorsmile/refreshInsertIntoHiveTable.
1 parent 1191048 commit e788897

File tree

3 files changed

+75
-14
lines changed

3 files changed

+75
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,10 @@ case class LoadDataCommand(
317317
holdDDLTime = false,
318318
isSrcLocal = isLocal)
319319
}
320+
321+
// Refresh the metadata cache to ensure the data visible to the users
322+
catalog.refreshTable(targetTable.identifier)
323+
320324
Seq.empty[Row]
321325
}
322326
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,13 +246,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
246246
val logicalRelation = cached.getOrElse {
247247
val sizeInBytes =
248248
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
249-
val fileCatalog = {
250-
val catalog = new CatalogFileIndex(
249+
val fileIndex = {
250+
val index = new CatalogFileIndex(
251251
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
252252
if (lazyPruningEnabled) {
253-
catalog
253+
index
254254
} else {
255-
catalog.filterPartitions(Nil) // materialize all the partitions in memory
255+
index.filterPartitions(Nil) // materialize all the partitions in memory
256256
}
257257
}
258258
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
@@ -261,7 +261,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
261261
.filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
262262

263263
val relation = HadoopFsRelation(
264-
location = fileCatalog,
264+
location = fileIndex,
265265
partitionSchema = partitionSchema,
266266
dataSchema = dataSchema,
267267
bucketSpec = bucketSpec,

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import org.apache.spark.sql._
2323
import org.apache.spark.sql.catalyst.TableIdentifier
2424
import org.apache.spark.sql.execution.DataSourceScanExec
2525
import org.apache.spark.sql.execution.command.ExecutedCommandExec
26-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
27-
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
26+
import org.apache.spark.sql.execution.datasources._
2827
import org.apache.spark.sql.hive.execution.HiveTableScanExec
2928
import org.apache.spark.sql.hive.test.TestHiveSingleton
3029
import org.apache.spark.sql.internal.SQLConf
@@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
187186
"normal_parquet",
188187
"jt",
189188
"jt_array",
190-
"test_parquet")
189+
"test_parquet")
190+
super.afterAll()
191191
}
192192

193193
test(s"conversion is working") {
@@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
575575

576576
checkAnswer(
577577
sql("SELECT * FROM test_added_partitions"),
578-
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
578+
Seq(Row("foo", 0), Row("bar", 0)))
579579

580580
// Create partition without data files and check whether it can be read
581581
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
582582
checkAnswer(
583583
sql("SELECT * FROM test_added_partitions"),
584-
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
584+
Seq(Row("foo", 0), Row("bar", 0)))
585585

586586
// Add data files to partition directory and check whether they can be read
587587
sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
588588
checkAnswer(
589589
sql("SELECT * FROM test_added_partitions"),
590-
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
590+
Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
591591

592592
// Check it with pruning predicates
593593
checkAnswer(
594594
sql("SELECT * FROM test_added_partitions where b = 0"),
595-
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
595+
Seq(Row("foo", 0), Row("bar", 0)))
596596
checkAnswer(
597597
sql("SELECT * FROM test_added_partitions where b = 1"),
598-
Seq(("baz", 1)).toDF("a", "b"))
598+
Seq(Row("baz", 1)))
599599
checkAnswer(
600600
sql("SELECT * FROM test_added_partitions where b = 2"),
601-
Seq[(String, Int)]().toDF("a", "b"))
601+
Seq.empty)
602602

603603
// Also verify the inputFiles implementation
604604
assert(sql("select * from test_added_partitions").inputFiles.length == 2)
@@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
609609
}
610610
}
611611

612+
test("Explicitly added partitions should be readable after load") {
613+
withTable("test_added_partitions") {
614+
withTempDir { src =>
615+
val newPartitionDir = src.getCanonicalPath
616+
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
617+
.mode("overwrite")
618+
.parquet(newPartitionDir)
619+
620+
sql(
621+
"""
622+
|CREATE TABLE test_added_partitions (a STRING)
623+
|PARTITIONED BY (b INT)
624+
|STORED AS PARQUET
625+
""".stripMargin)
626+
627+
// Create partition without data files and check whether it can be read
628+
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
629+
// This table fetch is to fill the cache with zero leaf files
630+
checkAnswer(spark.table("test_added_partitions"), Seq.empty)
631+
632+
sql(
633+
s"""
634+
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
635+
|INTO TABLE test_added_partitions PARTITION(b='1')
636+
""".stripMargin)
637+
638+
checkAnswer(
639+
spark.table("test_added_partitions"),
640+
Seq(Row("0", 1), Row("1", 1)))
641+
}
642+
}
643+
}
644+
645+
test("Non-partitioned table readable after load") {
646+
withTable("tab") {
647+
withTempDir { src =>
648+
val newPartitionDir = src.getCanonicalPath
649+
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
650+
.mode("overwrite")
651+
.parquet(newPartitionDir)
652+
653+
sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
654+
655+
// This table fetch is to fill the cache with zero leaf files
656+
checkAnswer(spark.table("tab"), Seq.empty)
657+
658+
sql(
659+
s"""
660+
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
661+
|INTO TABLE tab
662+
""".stripMargin)
663+
664+
checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
665+
}
666+
}
667+
}
668+
612669
test("self-join") {
613670
val table = spark.table("normal_parquet")
614671
val selfJoin = table.as("t1").crossJoin(table.as("t2"))

0 commit comments

Comments
 (0)