Skip to content

Commit 0c1c0fb

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai <yhuai@databricks.com> Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. (cherry picked from commit 4b82bd7) Signed-off-by: Michael Armbrust <michael@databricks.com>
1 parent 0ef46b2 commit 0c1c0fb

File tree

2 files changed

+167
-6
lines changed

2 files changed

+167
-6
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
116116
}
117117

118118
override def refreshTable(databaseName: String, tableName: String): Unit = {
119-
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
119+
// refresh table does not eagerly reload the cache. It just invalidate the cache.
120+
// Next time when we use the table, it will be populated in the cache.
121+
invalidateTable(databaseName, tableName)
120122
}
121123

122124
def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
229231
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
230232
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
231233
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
232-
val parquetOptions = Map(
233-
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
234-
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
235234

236235
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
237236
// serialize the Metastore schema to JSON and pass it as a data source option because of the
238237
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
238+
val parquetOptions = Map(
239+
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
240+
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
241+
val tableIdentifier =
242+
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
243+
244+
def getCached(
245+
tableIdentifier: QualifiedTableName,
246+
pathsInMetastore: Seq[String],
247+
schemaInMetastore: StructType,
248+
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
249+
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
250+
case null => None // Cache miss
251+
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
252+
// If we have the same paths, same schema, and same partition spec,
253+
// we will use the cached Parquet Relation.
254+
val useCached =
255+
parquetRelation.paths == pathsInMetastore &&
256+
logical.schema.sameType(metastoreSchema) &&
257+
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
258+
259+
if (useCached) Some(logical) else None
260+
case other =>
261+
logWarning(
262+
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
263+
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
264+
s"This cached entry will be invalidated.")
265+
cachedDataSourceTables.invalidate(tableIdentifier)
266+
None
267+
}
268+
}
269+
239270
if (metastoreRelation.hiveQlTable.isPartitioned) {
240271
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
241272
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
@@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
248279
}
249280
val partitionSpec = PartitionSpec(partitionSchema, partitions)
250281
val paths = partitions.map(_.path)
251-
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
282+
283+
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
284+
val parquetRelation = cached.getOrElse {
285+
val created =
286+
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
287+
cachedDataSourceTables.put(tableIdentifier, created)
288+
created
289+
}
290+
291+
parquetRelation
252292
} else {
253293
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
254-
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
294+
295+
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
296+
val parquetRelation = cached.getOrElse {
297+
val created =
298+
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
299+
cachedDataSourceTables.put(tableIdentifier, created)
300+
created
301+
}
302+
303+
parquetRelation
255304
}
256305
}
257306

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
2626
import org.apache.spark.sql.catalyst.expressions.Row
2727
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
2828
import org.apache.spark.sql.hive.execution.HiveTableScan
29+
import org.apache.spark.sql.hive.test.TestHive
2930
import org.apache.spark.sql.hive.test.TestHive._
3031
import org.apache.spark.sql.hive.test.TestHive.implicits._
32+
import org.apache.spark.sql.json.JSONRelation
3133
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
3234
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
3335
import org.apache.spark.sql.SaveMode
@@ -389,6 +391,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
389391

390392
sql("DROP TABLE ms_convert")
391393
}
394+
395+
test("Caching converted data source Parquet Relations") {
396+
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
397+
// Converted test_parquet should be cached.
398+
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
399+
case null => fail("Converted test_parquet should be cached in the cache.")
400+
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
401+
case other =>
402+
fail(
403+
"The cached test_parquet should be a Parquet Relation. " +
404+
s"However, $other is returned form the cache.")
405+
}
406+
}
407+
408+
sql("DROP TABLE IF EXISTS test_insert_parquet")
409+
sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")
410+
411+
sql(
412+
"""
413+
|create table test_insert_parquet
414+
|(
415+
| intField INT,
416+
| stringField STRING
417+
|)
418+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
419+
|STORED AS
420+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
421+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
422+
""".stripMargin)
423+
424+
var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
425+
426+
// First, make sure the converted test_parquet is not cached.
427+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
428+
// Table lookup will make the table cached.
429+
table("test_insert_parquet")
430+
checkCached(tableIdentifer)
431+
// For insert into non-partitioned table, we will do the conversion,
432+
// so the converted test_insert_parquet should be cached.
433+
invalidateTable("test_insert_parquet")
434+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
435+
sql(
436+
"""
437+
|INSERT INTO TABLE test_insert_parquet
438+
|select a, b from jt
439+
""".stripMargin)
440+
checkCached(tableIdentifer)
441+
// Make sure we can read the data.
442+
checkAnswer(
443+
sql("select * from test_insert_parquet"),
444+
sql("select a, b from jt").collect())
445+
// Invalidate the cache.
446+
invalidateTable("test_insert_parquet")
447+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
448+
449+
// Create a partitioned table.
450+
sql(
451+
"""
452+
|create table test_parquet_partitioned_cache_test
453+
|(
454+
| intField INT,
455+
| stringField STRING
456+
|)
457+
|PARTITIONED BY (date string)
458+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
459+
|STORED AS
460+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
461+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
462+
""".stripMargin)
463+
464+
tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
465+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
466+
sql(
467+
"""
468+
|INSERT INTO TABLE test_parquet_partitioned_cache_test
469+
|PARTITION (date='2015-04-01')
470+
|select a, b from jt
471+
""".stripMargin)
472+
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
473+
// So, we expect it is not cached.
474+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
475+
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
476+
sql(
477+
"""
478+
|INSERT INTO TABLE test_parquet_partitioned_cache_test
479+
|PARTITION (date='2015-04-02')
480+
|select a, b from jt
481+
""".stripMargin)
482+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
483+
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
484+
485+
// Make sure we can cache the partitioned table.
486+
table("test_parquet_partitioned_cache_test")
487+
checkCached(tableIdentifer)
488+
// Make sure we can read the data.
489+
checkAnswer(
490+
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
491+
sql(
492+
"""
493+
|select b, '2015-04-01', a FROM jt
494+
|UNION ALL
495+
|select b, '2015-04-02', a FROM jt
496+
""".stripMargin).collect())
497+
498+
invalidateTable("test_parquet_partitioned_cache_test")
499+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
500+
501+
sql("DROP TABLE test_insert_parquet")
502+
sql("DROP TABLE test_parquet_partitioned_cache_test")
503+
}
392504
}
393505

394506
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {

0 commit comments

Comments
 (0)