Skip to content

Commit db37049

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables
```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes #16500 from gatorsmile/refreshInsertIntoHiveTable. (cherry picked from commit de62ddf) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 5e9be1e commit db37049

File tree

3 files changed

+75
-14
lines changed

3 files changed

+75
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ case class LoadDataCommand(
305305
isOverwrite,
306306
holdDDLTime = false)
307307
}
308+
309+
// Refresh the metadata cache to ensure the data visible to the users
310+
catalog.refreshTable(targetTable.identifier)
311+
308312
Seq.empty[Row]
309313
}
310314
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
233233

234234
val logicalRelation = cached.getOrElse {
235235
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
236-
val fileCatalog = {
237-
val catalog = new CatalogFileIndex(
236+
val fileIndex = {
237+
val index = new CatalogFileIndex(
238238
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
239239
if (lazyPruningEnabled) {
240-
catalog
240+
index
241241
} else {
242-
catalog.filterPartitions(Nil) // materialize all the partitions in memory
242+
index.filterPartitions(Nil) // materialize all the partitions in memory
243243
}
244244
}
245245
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
@@ -248,7 +248,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
248248
.filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
249249

250250
val relation = HadoopFsRelation(
251-
location = fileCatalog,
251+
location = fileIndex,
252252
partitionSchema = partitionSchema,
253253
dataSchema = dataSchema,
254254
bucketSpec = bucketSpec,

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import org.apache.spark.sql._
2323
import org.apache.spark.sql.catalyst.TableIdentifier
2424
import org.apache.spark.sql.execution.DataSourceScanExec
2525
import org.apache.spark.sql.execution.command.ExecutedCommandExec
26-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
27-
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
26+
import org.apache.spark.sql.execution.datasources._
2827
import org.apache.spark.sql.hive.execution.HiveTableScanExec
2928
import org.apache.spark.sql.hive.test.TestHiveSingleton
3029
import org.apache.spark.sql.internal.SQLConf
@@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
187186
"normal_parquet",
188187
"jt",
189188
"jt_array",
190-
"test_parquet")
189+
"test_parquet")
190+
super.afterAll()
191191
}
192192

193193
test(s"conversion is working") {
@@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
575575

576576
checkAnswer(
577577
sql("SELECT * FROM test_added_partitions"),
578-
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
578+
Seq(Row("foo", 0), Row("bar", 0)))
579579

580580
// Create partition without data files and check whether it can be read
581581
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
582582
checkAnswer(
583583
sql("SELECT * FROM test_added_partitions"),
584-
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
584+
Seq(Row("foo", 0), Row("bar", 0)))
585585

586586
// Add data files to partition directory and check whether they can be read
587587
sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
588588
checkAnswer(
589589
sql("SELECT * FROM test_added_partitions"),
590-
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
590+
Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
591591

592592
// Check it with pruning predicates
593593
checkAnswer(
594594
sql("SELECT * FROM test_added_partitions where b = 0"),
595-
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
595+
Seq(Row("foo", 0), Row("bar", 0)))
596596
checkAnswer(
597597
sql("SELECT * FROM test_added_partitions where b = 1"),
598-
Seq(("baz", 1)).toDF("a", "b"))
598+
Seq(Row("baz", 1)))
599599
checkAnswer(
600600
sql("SELECT * FROM test_added_partitions where b = 2"),
601-
Seq[(String, Int)]().toDF("a", "b"))
601+
Seq.empty)
602602

603603
// Also verify the inputFiles implementation
604604
assert(sql("select * from test_added_partitions").inputFiles.length == 2)
@@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
609609
}
610610
}
611611

612+
test("Explicitly added partitions should be readable after load") {
613+
withTable("test_added_partitions") {
614+
withTempDir { src =>
615+
val newPartitionDir = src.getCanonicalPath
616+
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
617+
.mode("overwrite")
618+
.parquet(newPartitionDir)
619+
620+
sql(
621+
"""
622+
|CREATE TABLE test_added_partitions (a STRING)
623+
|PARTITIONED BY (b INT)
624+
|STORED AS PARQUET
625+
""".stripMargin)
626+
627+
// Create partition without data files and check whether it can be read
628+
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
629+
// This table fetch is to fill the cache with zero leaf files
630+
checkAnswer(spark.table("test_added_partitions"), Seq.empty)
631+
632+
sql(
633+
s"""
634+
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
635+
|INTO TABLE test_added_partitions PARTITION(b='1')
636+
""".stripMargin)
637+
638+
checkAnswer(
639+
spark.table("test_added_partitions"),
640+
Seq(Row("0", 1), Row("1", 1)))
641+
}
642+
}
643+
}
644+
645+
test("Non-partitioned table readable after load") {
646+
withTable("tab") {
647+
withTempDir { src =>
648+
val newPartitionDir = src.getCanonicalPath
649+
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
650+
.mode("overwrite")
651+
.parquet(newPartitionDir)
652+
653+
sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
654+
655+
// This table fetch is to fill the cache with zero leaf files
656+
checkAnswer(spark.table("tab"), Seq.empty)
657+
658+
sql(
659+
s"""
660+
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
661+
|INTO TABLE tab
662+
""".stripMargin)
663+
664+
checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
665+
}
666+
}
667+
}
668+
612669
test("self-join") {
613670
val table = spark.table("normal_parquet")
614671
val selfJoin = table.as("t1").crossJoin(table.as("t2"))

0 commit comments

Comments
 (0)