-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables #16500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The actual code changes are just two lines. |
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) | ||
if (partition.nonEmpty) { | ||
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we can further limit the calls of refreshTable
. For example, checking whether the format is parquet
or orc
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it safe to restrict this call to the case where partition.nonEmpty
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because hive serde tables do not use the file status cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea I think so, but I don't think it worth to avoid this refreshTable
call and add a lot of comments to explain it. This is too subtle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @ericl @mallman For non-partitioned parquet/orc tables, we convert them to the data source tables. Thus, it will not call InsertIntoHiveTable
.
I know it is a little bit confusing, but I am fine to revert it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's revert it first, we should think about cache and refresh more thorough later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
Test build #71023 has finished for PR 16500 at commit
|
I'm wondering if we need the metadata cache anymore. Now we store partitions in the metastore, and have a cache for leaf files, what's the benefit of metadata cache? |
I guess the only purpose of the cache now is to associate file-status caches with specific table names. If we removed that, then tables would have to find their file-status cache by path, or we could have a global file cache. Maybe we also use it to hold the computed schema in some cases? Not sure if that is always provided by the metastore. |
test("Non-partitioned table readable after load") { | ||
withTable("tab") { | ||
withTempDir { src => | ||
val newPartitionDir = new File(src, "data").getCanonicalPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we just use src
as the table path?
} | ||
} | ||
|
||
test("Partitioned table readable after insert") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and the next test are not needed if we revert https://github.com/apache/spark/pull/16500/files#diff-d579db9a8f27e0bbef37720ab14ec3f6L395
test("Explicitly added partitions should be readable after load") { | ||
withTable("test_added_partitions") { | ||
withTempDir { src => | ||
val newPartitionDir = new File(src, "partition").getCanonicalPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we just use src
as partition path?
Let me think whether we can improve the existing verification mechanism for both caches in the test cases. It can help us to know what the caches actually contain. |
Test build #71295 has started for PR 16500 at commit |
Test build #71296 has started for PR 16500 at commit |
retest this please |
Test build #71313 has finished for PR 16500 at commit
|
oops... Wrong branch... Need to revert it. |
c27a9af
to
d2d751b
Compare
Test build #71357 has finished for PR 16500 at commit
|
Test build #71354 has finished for PR 16500 at commit
|
|
||
checkAnswer( | ||
spark.table("test_added_partitions"), | ||
Seq(("0", 1), ("1", 1)).toDF("a", "b")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually write checkAnswer(df, Row("0", 1) :: Row("1", 1) :: Nil)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or use Seq[Row]
like you already did: https://github.com/apache/spark/pull/16500/files#diff-3822214ea47830564619c00c4fe7eb0aR664
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: ) That is copied from the other test cases. Let me correct it and the others in this test suite.
Test build #71382 has finished for PR 16500 at commit
|
```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes #16500 from gatorsmile/refreshInsertIntoHiveTable. (cherry picked from commit de62ddf) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.1! |
### What changes were proposed in this pull request? ```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. ### How was this patch tested? Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16500 from gatorsmile/refreshInsertIntoHiveTable.
### What changes were proposed in this pull request? ```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. ### How was this patch tested? Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16500 from gatorsmile/refreshInsertIntoHiveTable.
What changes were proposed in this pull request?
In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only
parquet
andorc
formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables whenspark.sql.hive.convertMetastoreParquet
/spark.sql.hive.convertMetastoreOrc
is on.This PR is to refresh the metadata cache after processing the
LOAD DATA
command.In addition, Spark SQL does not convert partitioned Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both partitioned and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use
InsertIntoHiveTable
, instead ofInsertIntoHadoopFsRelationCommand
. To avoid reading the out-of-dated cache,InsertIntoHiveTable
needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not callInsertIntoHiveTable
at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.How was this patch tested?
Added test cases in parquetSuites.scala