Skip to content

Commit 010ec2b

Browse files
ericluzadude
authored andcommitted
[SPARK-17980][SQL] Fix refreshByPath for converted Hive tables
## What changes were proposed in this pull request? There was a bug introduced in apache#14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions). This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue. cc sameeragarwal for refreshByPath changes mallman ## How was this patch tested? Extended unit test. Author: Eric Liang <ekl@databricks.com> Closes apache#15521 from ericl/fix-caching.
1 parent 0d6ef14 commit 010ec2b

File tree

5 files changed

+39
-10
lines changed

5 files changed

+39
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ abstract class Catalog {
343343

344344
/**
345345
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
346-
* contains the given data source path.
346+
* contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
347+
* everything that is cached.
347348
*
348349
* @since 2.0.0
349350
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,10 @@ class CacheManager extends Logging {
185185
plan match {
186186
case lr: LogicalRelation => lr.relation match {
187187
case hr: HadoopFsRelation =>
188+
val prefixToInvalidate = qualifiedPath.toString
188189
val invalidate = hr.location.rootPaths
189-
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
190-
.contains(qualifiedPath)
190+
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
191+
.exists(_.startsWith(prefixToInvalidate))
191192
if (invalidate) hr.location.refresh()
192193
invalidate
193194
case _ => false

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,18 @@ class TableFileCatalog(
4848

4949
private val baseLocation = catalogTable.storage.locationUri
5050

51+
// Populated on-demand by calls to cachedAllPartitions
52+
private var cachedAllPartitions: ListingFileCatalog = null
53+
5154
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
5255

5356
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
5457
filterPartitions(filters).listFiles(Nil)
5558
}
5659

57-
override def refresh(): Unit = {}
60+
override def refresh(): Unit = synchronized {
61+
cachedAllPartitions = null
62+
}
5863

5964
/**
6065
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
@@ -64,7 +69,7 @@ class TableFileCatalog(
6469
*/
6570
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
6671
if (filters.isEmpty) {
67-
cachedAllPartitions
72+
allPartitions
6873
} else {
6974
filterPartitions0(filters)
7075
}
@@ -89,9 +94,14 @@ class TableFileCatalog(
8994
}
9095

9196
// Not used in the hot path of queries when metastore partition pruning is enabled
92-
lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
97+
def allPartitions: ListingFileCatalog = synchronized {
98+
if (cachedAllPartitions == null) {
99+
cachedAllPartitions = filterPartitions0(Nil)
100+
}
101+
cachedAllPartitions
102+
}
93103

94-
override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
104+
override def inputFiles: Array[String] = allPartitions.inputFiles
95105
}
96106

97107
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
235235
if (lazyPruningEnabled) {
236236
catalog
237237
} else {
238-
catalog.cachedAllPartitions
238+
catalog.allPartitions
239239
}
240240
}
241241
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
8080
val df = spark.sql("select * from test")
8181
assert(sql("select * from test").count() == 5)
8282

83+
def deleteRandomFile(): Unit = {
84+
val p = new Path(spark.table("test").inputFiles.head)
85+
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
86+
}
87+
8388
// Delete a file, then assert that we tried to read it. This means the table was cached.
84-
val p = new Path(spark.table("test").inputFiles.head)
85-
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
89+
deleteRandomFile()
8690
val e = intercept[SparkException] {
8791
sql("select * from test").count()
8892
}
@@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
9195
// Test refreshing the cache.
9296
spark.catalog.refreshTable("test")
9397
assert(sql("select * from test").count() == 4)
98+
assert(spark.table("test").inputFiles.length == 4)
99+
100+
// Test refresh by path separately since it goes through different code paths than
101+
// refreshTable does.
102+
deleteRandomFile()
103+
spark.catalog.cacheTable("test")
104+
spark.catalog.refreshByPath("/some-invalid-path") // no-op
105+
val e2 = intercept[SparkException] {
106+
sql("select * from test").count()
107+
}
108+
assert(e2.getMessage.contains("FileNotFoundException"))
109+
spark.catalog.refreshByPath(dir.getAbsolutePath)
110+
assert(sql("select * from test").count() == 3)
94111
}
95112
}
96113
}

0 commit comments

Comments
 (0)