Skip to content

Commit 5d38f09

Browse files
committed
fix.
1 parent 30345c4 commit 5d38f09

File tree

3 files changed

+23
-27
lines changed

3 files changed

+23
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
213213
bucketSpec = getBucketSpec,
214214
options = extraOptions.toMap)
215215

216-
dataSource.write(mode, df)
216+
dataSource.write(mode, df, isForWriteOnly = true)
217217
}
218218

219219
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,16 @@ case class DataSource(
413413
relation
414414
}
415415

416-
/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
416+
/**
417+
* Writes the given [[DataFrame]] out to this [[DataSource]].
418+
*
419+
* @param isForWriteOnly Whether to exit early and just write the data without returning
420+
* a [[BaseRelation]].
421+
*/
417422
def write(
418423
mode: SaveMode,
419-
data: DataFrame): BaseRelation = {
424+
data: DataFrame,
425+
isForWriteOnly: Boolean = false): BaseRelation = {
420426
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
421427
throw new AnalysisException("Cannot save interval data type into external storage.")
422428
}
@@ -494,8 +500,13 @@ case class DataSource(
494500
catalogTable = catalogTable,
495501
fileIndex = fileIndex)
496502
sparkSession.sessionState.executePlan(plan).toRdd
497-
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
498-
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
503+
if (isForWriteOnly) {
504+
// Exit earlier and return null
505+
null
506+
} else {
507+
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
508+
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
509+
}
499510

500511
case _ =>
501512
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")

sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
6262
}
6363

6464
private def setupPartitionedHiveTable(
65-
tableName: String, dir: File, scale: Int,
66-
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
65+
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
6766
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
6867
.partitionBy("partCol1", "partCol2")
6968
.mode("overwrite")
7069
.parquet(dir.getAbsolutePath)
7170

72-
if (clearMetricsBeforeCreate) {
73-
HiveCatalogMetrics.reset()
74-
}
75-
7671
spark.sql(s"""
7772
|create external table $tableName (fieldOne long)
7873
|partitioned by (partCol1 int, partCol2 int)
@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
8883
}
8984

9085
private def setupPartitionedDatasourceTable(
91-
tableName: String, dir: File, scale: Int,
92-
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
86+
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
9387
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
9488
.partitionBy("partCol1", "partCol2")
9589
.mode("overwrite")
9690
.parquet(dir.getAbsolutePath)
9791

98-
if (clearMetricsBeforeCreate) {
99-
HiveCatalogMetrics.reset()
100-
}
101-
10292
spark.sql(s"""
10393
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
10494
|using parquet
@@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
271261
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
272262
withTable("test") {
273263
withTempDir { dir =>
274-
setupPartitionedDatasourceTable(
275-
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
264+
HiveCatalogMetrics.reset()
265+
setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
276266
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
277267
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
278268
}
@@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
285275
withTable("test") {
286276
withTempDir { dir =>
287277
HiveCatalogMetrics.reset()
288-
setupPartitionedHiveTable(
289-
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
278+
setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
290279
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
291280
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
292281
}
@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
416405
})
417406
executorPool.shutdown()
418407
executorPool.awaitTermination(30, TimeUnit.SECONDS)
419-
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
420-
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
421-
// only one thread can really do the build, so the listing job count is 2, the other
422-
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
423-
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
424-
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
408+
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
409+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
425410
}
426411
}
427412
}

0 commit comments

Comments
 (0)