Skip to content

[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables #16500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

gatorsmile
Copy link
Member

@gatorsmile gatorsmile commented Jan 8, 2017

What changes were proposed in this pull request?

        sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

        // This table fetch is to fill the cache with zero leaf files
        spark.table("tab").show()

        sql(
          s"""
             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
             |INTO TABLE tab
           """.stripMargin)

        spark.table("tab").show()

In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only parquet and orc formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when spark.sql.hive.convertMetastoreParquet/spark.sql.hive.convertMetastoreOrc is on.

This PR is to refresh the metadata cache after processing the LOAD DATA command.

In addition, Spark SQL does not convert partitioned Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both partitioned and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use InsertIntoHiveTable, instead of InsertIntoHadoopFsRelationCommand. To avoid reading the out-of-dated cache, InsertIntoHiveTable needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call InsertIntoHiveTable at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.

How was this patch tested?

Added test cases in parquetSuites.scala

@gatorsmile
Copy link
Member Author

cc @ericl @cloud-fan @mallman

The actual code changes are just two lines.

sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
if (partition.nonEmpty) {
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we can further limit the calls of refreshTable. For example, checking whether the format is parquet or orc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it safe to restrict this call to the case where partition.nonEmpty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because hive serde tables do not use the file status cache?

Copy link
Contributor

@cloud-fan cloud-fan Jan 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think so, but I don't think it worth to avoid this refreshTable call and add a lot of comments to explain it. This is too subtle.

Copy link
Member Author

@gatorsmile gatorsmile Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @ericl @mallman For non-partitioned parquet/orc tables, we convert them to the data source tables. Thus, it will not call InsertIntoHiveTable.

I know it is a little bit confusing, but I am fine to revert it back

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's revert it first, we should think about cache and refresh more thorough later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

@gatorsmile gatorsmile changed the title [SPARK-19120] [SPARK-19121] Refresh Metadata Cache After Load Partitioned Hive Tables [SPARK-19120] [SPARK-19121] Refresh Metadata Cache After Loading Hive Tables Jan 8, 2017
@SparkQA
Copy link

SparkQA commented Jan 8, 2017

Test build #71023 has finished for PR 16500 at commit 0f70e91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

I'm wondering if we need the metadata cache anymore. Now we store partitions in the metastore, and have a cache for leaf files, what's the benefit of metadata cache?

@ericl
Copy link
Contributor

ericl commented Jan 12, 2017

I guess the only purpose of the cache now is to associate file-status caches with specific table names. If we removed that, then tables would have to find their file-status cache by path, or we could have a global file cache.

Maybe we also use it to hold the computed schema in some cases? Not sure if that is always provided by the metastore.

test("Non-partitioned table readable after load") {
withTable("tab") {
withTempDir { src =>
val newPartitionDir = new File(src, "data").getCanonicalPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just use src as the table path?

}
}

test("Partitioned table readable after insert") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test("Explicitly added partitions should be readable after load") {
withTable("test_added_partitions") {
withTempDir { src =>
val newPartitionDir = new File(src, "partition").getCanonicalPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just use src as partition path?

@gatorsmile
Copy link
Member Author

Let me think whether we can improve the existing verification mechanism for both caches in the test cases. It can help us to know what the caches actually contain.

@gatorsmile gatorsmile changed the title [SPARK-19120] [SPARK-19121] Refresh Metadata Cache After Loading Hive Tables [SPARK-19120] Refresh Metadata Cache After Loading Hive Tables Jan 13, 2017
@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71295 has started for PR 16500 at commit 11507cc.

@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71296 has started for PR 16500 at commit 203e36c.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71313 has finished for PR 16500 at commit 203e36c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

oops... Wrong branch... Need to revert it.

@gatorsmile gatorsmile force-pushed the refreshInsertIntoHiveTable branch from c27a9af to d2d751b Compare January 14, 2017 00:29
@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71357 has finished for PR 16500 at commit d2d751b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71354 has finished for PR 16500 at commit c27a9af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


checkAnswer(
spark.table("test_added_partitions"),
Seq(("0", 1), ("1", 1)).toDF("a", "b"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually write checkAnswer(df, Row("0", 1) :: Row("1", 1) :: Nil)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ) That is copied from the other test cases. Let me correct it and the others in this test suite.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71382 has finished for PR 16500 at commit 14da2b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in de62ddf Jan 15, 2017
asfgit pushed a commit that referenced this pull request Jan 15, 2017
```Scala
        sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

        // This table fetch is to fill the cache with zero leaf files
        spark.table("tab").show()

        sql(
          s"""
             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
             |INTO TABLE tab
           """.stripMargin)

        spark.table("tab").show()
```

In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of  parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.

This PR is to refresh the metadata cache after processing the `LOAD DATA` command.

In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.

Added test cases in parquetSuites.scala

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16500 from gatorsmile/refreshInsertIntoHiveTable.

(cherry picked from commit de62ddf)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.1!

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
### What changes were proposed in this pull request?
```Scala
        sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

        // This table fetch is to fill the cache with zero leaf files
        spark.table("tab").show()

        sql(
          s"""
             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
             |INTO TABLE tab
           """.stripMargin)

        spark.table("tab").show()
```

In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of  parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.

This PR is to refresh the metadata cache after processing the `LOAD DATA` command.

In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.

### How was this patch tested?
Added test cases in parquetSuites.scala

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#16500 from gatorsmile/refreshInsertIntoHiveTable.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
### What changes were proposed in this pull request?
```Scala
        sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

        // This table fetch is to fill the cache with zero leaf files
        spark.table("tab").show()

        sql(
          s"""
             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
             |INTO TABLE tab
           """.stripMargin)

        spark.table("tab").show()
```

In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of  parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.

This PR is to refresh the metadata cache after processing the `LOAD DATA` command.

In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.

### How was this patch tested?
Added test cases in parquetSuites.scala

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#16500 from gatorsmile/refreshInsertIntoHiveTable.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants