Skip to content

[SPARK-17980] [SQL] Fix refreshByPath for converted Hive tables #15521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ abstract class Catalog {

/**
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
* contains the given data source path.
* contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are changing the semantic of REFRESH PATH right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah

* everything that is cached.
*
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
.contains(qualifiedPath)
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
Copy link
Contributor

@cloud-fan cloud-fan Oct 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the prefix resolution? I think it's useful, so that users can refresh the table path to invalidate cache for partitioned data source table, but it's not related to this PR right?

Copy link
Contributor Author

@ericl ericl Oct 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You actually need this when metastore partition pruning is disabled for converted hive tables. Otherwise, the unit test below would fail on that case.

(but yeah, we could also leave that alone)

if (invalidate) hr.location.refresh()
invalidate
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ class TableFileCatalog(

private val baseLocation = catalogTable.storage.locationUri

// Populated on-demand by calls to cachedAllPartitions
private var cachedAllPartitions: ListingFileCatalog = null

override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq

override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
filterPartitions(filters).listFiles(Nil)
}

override def refresh(): Unit = {}
override def refresh(): Unit = synchronized {
cachedAllPartitions = null
}

/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
Expand All @@ -65,7 +70,7 @@ class TableFileCatalog(
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
if (filters.isEmpty) {
cachedAllPartitions
allPartitions
} else {
filterPartitions0(filters)
}
Expand All @@ -90,9 +95,14 @@ class TableFileCatalog(
}

// Not used in the hot path of queries when metastore partition pruning is enabled
lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
def allPartitions: ListingFileCatalog = synchronized {
if (cachedAllPartitions == null) {
cachedAllPartitions = filterPartitions0(Nil)
}
cachedAllPartitions
}

override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
override def inputFiles: Array[String] = allPartitions.inputFiles
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
catalog.cachedAllPartitions
catalog.allPartitions
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
val df = spark.sql("select * from test")
assert(sql("select * from test").count() == 5)

def deleteRandomFile(): Unit = {
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
}

// Delete a file, then assert that we tried to read it. This means the table was cached.
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
deleteRandomFile()
val e = intercept[SparkException] {
sql("select * from test").count()
}
Expand All @@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
// Test refreshing the cache.
spark.catalog.refreshTable("test")
assert(sql("select * from test").count() == 4)
assert(spark.table("test").inputFiles.length == 4)

// Test refresh by path separately since it goes through different code paths than
// refreshTable does.
deleteRandomFile()
spark.catalog.cacheTable("test")
spark.catalog.refreshByPath("/some-invalid-path") // no-op
val e2 = intercept[SparkException] {
sql("select * from test").count()
}
assert(e2.getMessage.contains("FileNotFoundException"))
spark.catalog.refreshByPath(dir.getAbsolutePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: before #14690, users need to refresh one of the partition paths to invalide the cache, but now they have to refresh the table path, because TableFileCatalog.rootPaths only contains table path while ListingFileCatalog.rootPaths only contains partition paths.

I think it's better than before, but it's still a breaking change, should we docuement it in the 2.1 release notes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. To get the old behavior, they can also disable the feature flag.

assert(sql("select * from test").count() == 3)
}
}
}
Expand Down