-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17980] [SQL] Fix refreshByPath for converted Hive tables #15521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,9 +185,10 @@ class CacheManager extends Logging { | |
plan match { | ||
case lr: LogicalRelation => lr.relation match { | ||
case hr: HadoopFsRelation => | ||
val prefixToInvalidate = qualifiedPath.toString | ||
val invalidate = hr.location.rootPaths | ||
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) | ||
.contains(qualifiedPath) | ||
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString) | ||
.exists(_.startsWith(prefixToInvalidate)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need the prefix resolution? I think it's useful, so that users can refresh the table path to invalidate cache for partitioned data source table, but it's not related to this PR right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You actually need this when metastore partition pruning is disabled for converted hive tables. Otherwise, the unit test below would fail on that case. (but yeah, we could also leave that alone) |
||
if (invalidate) hr.location.refresh() | ||
invalidate | ||
case _ => false | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi | |
val df = spark.sql("select * from test") | ||
assert(sql("select * from test").count() == 5) | ||
|
||
def deleteRandomFile(): Unit = { | ||
val p = new Path(spark.table("test").inputFiles.head) | ||
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true)) | ||
} | ||
|
||
// Delete a file, then assert that we tried to read it. This means the table was cached. | ||
val p = new Path(spark.table("test").inputFiles.head) | ||
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true)) | ||
deleteRandomFile() | ||
val e = intercept[SparkException] { | ||
sql("select * from test").count() | ||
} | ||
|
@@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi | |
// Test refreshing the cache. | ||
spark.catalog.refreshTable("test") | ||
assert(sql("select * from test").count() == 4) | ||
assert(spark.table("test").inputFiles.length == 4) | ||
|
||
// Test refresh by path separately since it goes through different code paths than | ||
// refreshTable does. | ||
deleteRandomFile() | ||
spark.catalog.cacheTable("test") | ||
spark.catalog.refreshByPath("/some-invalid-path") // no-op | ||
val e2 = intercept[SparkException] { | ||
sql("select * from test").count() | ||
} | ||
assert(e2.getMessage.contains("FileNotFoundException")) | ||
spark.catalog.refreshByPath(dir.getAbsolutePath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: before #14690, users need to refresh one of the partition paths to invalide the cache, but now they have to refresh the table path, because I think it's better than before, but it's still a breaking change, should we docuement it in the 2.1 release notes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. To get the old behavior, they can also disable the feature flag. |
||
assert(sql("select * from test").count() == 3) | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are changing the semantic of
REFRESH PATH
right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah