Skip to content

[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata #5339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps mention why this is important

// Since we also cache ParquetRealtions converted from Hive Parquet tables and
// adding converted ParquetRealtions into the cache is not defined in the load function
// of the cache (instead, we add the cache entry in convertToParquetRelation),
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
invalidateTable(databaseName, tableName)
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
Expand Down Expand Up @@ -213,13 +221,48 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)

// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore

if (useCached) {
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
}

if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
Expand All @@ -232,10 +275,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))

val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))

val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ case class DropTable(
try {
hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
} catch {
// This table's metadata is not in
// This table's metadata is not in Hive metastore (e.g. the table does not exist).
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
// (e.g. invalid paths). We catch it and log a warning message.
// Users should be able to drop such kinds of tables regardless if there is an error.
case e: Throwable => log.warn(s"${e.getMessage}")
case e: Throwable => log.warn(s"${e.getMessage}", e)
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
Expand Down
110 changes: 110 additions & 0 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
Expand Down Expand Up @@ -390,6 +392,114 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE ms_convert")
}

test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
s"However, $other is returned form the cache.")
}
}

sql("DROP TABLE IF EXISTS test_insert_parquet")
sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")

sql(
"""
|create table test_insert_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")

// First, make sure the converted test_parquet is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifer)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

// Create a partitioned table.
sql(
"""
|create table test_parquet_partitioned_cache_test
|(
| intField INT,
| stringField STRING
|)
|PARTITIONED BY (date string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-01')
|select a, b from jt
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
sql(
"""
|select b, '2015-04-01', a FROM jt
|UNION ALL
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())

invalidateTable("test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

sql("DROP TABLE test_insert_parquet")
sql("DROP TABLE test_parquet_partitioned_cache_test")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand Down