Skip to content

Commit b48ced1

Browse files
committed
update
1 parent 34a9a25 commit b48ced1

File tree

5 files changed

+43
-28
lines changed

5 files changed

+43
-28
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,13 @@ package object config {
11801180
.intConf
11811181
.createWithDefault(1)
11821182

1183+
private[spark] val IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD =
1184+
ConfigBuilder("spark.io.file.unsplittable.warning.threshold")
1185+
.doc("When spark loading one single large unsplittable file, if file size exceed this " +
1186+
"threshold, then log warning.")
1187+
.longConf
1188+
.createWithDefault(1024 * 1024 * 1024)
1189+
11831190
private[spark] val EVENT_LOG_COMPRESSION_CODEC =
11841191
ConfigBuilder("spark.eventLog.compression.codec")
11851192
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.internal.config._
4141
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4242
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
4343
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}
44+
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
4545

4646
/**
4747
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -199,21 +199,7 @@ class HadoopRDD[K, V](
199199

200200
private val UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD = 1024 * 1024 * 1024
201201

202-
@transient private lazy val compressionCodecs = new CompressionCodecFactory(getJobConf())
203-
204-
private def checkAndLogUnsplittableLargeFile(split: InputSplit): Unit = {
205-
if (split.isInstanceOf[FileSplit]) {
206-
val fileSplit = split.asInstanceOf[FileSplit]
207-
val path = fileSplit.getPath
208-
val codec = compressionCodecs.getCodec(path)
209-
if (codec != null && !codec.isInstanceOf[SplittableCompressionCodec]) {
210-
if (fileSplit.getLength > UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD) {
211-
logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " +
212-
s"rdd partition have to deal with the whole file and consume large time.")
213-
}
214-
}
215-
}
216-
}
202+
@transient private lazy val codecFactory = new CompressionCodecFactory(getJobConf())
217203

218204
override def getPartitions: Array[Partition] = {
219205
val jobConf = getJobConf()
@@ -227,7 +213,13 @@ class HadoopRDD[K, V](
227213
allInputSplits
228214
}
229215
if (inputSplits.length == 1) {
230-
checkAndLogUnsplittableLargeFile(inputSplits(0))
216+
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
217+
val path = fileSplit.getPath
218+
if (Utils.isFileSplittable(path, codecFactory)
219+
&& fileSplit.getLength > conf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) {
220+
logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " +
221+
s"rdd partition have to deal with the whole file and consume large time.")
222+
}
231223
}
232224
val array = new Array[Partition](inputSplits.size)
233225
for (i <- 0 until inputSplits.size) {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import com.google.common.net.InetAddresses
5151
import org.apache.commons.lang3.SystemUtils
5252
import org.apache.hadoop.conf.Configuration
5353
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
54+
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
5455
import org.apache.hadoop.security.UserGroupInformation
5556
import org.apache.hadoop.yarn.conf.YarnConfiguration
5657
import org.eclipse.jetty.util.MultiException
@@ -2895,6 +2896,12 @@ private[spark] object Utils extends Logging {
28952896
def isLocalUri(uri: String): Boolean = {
28962897
uri.startsWith(s"$LOCAL_SCHEME:")
28972898
}
2899+
2900+
/** Check whether the file of the path is splittable. */
2901+
def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = {
2902+
val codec = codecFactory.getCodec(path)
2903+
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
2904+
}
28982905
}
28992906

29002907
private[util] object CallerContext extends Logging {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.{Locale, OptionalLong}
2121
import org.apache.commons.lang3.StringUtils
2222
import org.apache.hadoop.fs.Path
2323

24+
import org.apache.spark.internal.config.IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD
25+
import org.apache.spark.internal.Logging
2426
import org.apache.spark.sql.{AnalysisException, SparkSession}
2527
import org.apache.spark.sql.catalyst.expressions.AttributeReference
2628
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -36,7 +38,9 @@ abstract class FileScan(
3638
sparkSession: SparkSession,
3739
fileIndex: PartitioningAwareFileIndex,
3840
readDataSchema: StructType,
39-
readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics {
41+
readPartitionSchema: StructType)
42+
extends Scan
43+
with Batch with SupportsReportStatistics with Logging {
4044
/**
4145
* Returns whether a file with `path` could be split or not.
4246
*/
@@ -91,6 +95,16 @@ abstract class FileScan(
9195
)
9296
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
9397
}
98+
99+
if (splitFiles.length == 1) {
100+
val path = new Path(splitFiles(0).filePath)
101+
if (isSplitable(path) && splitFiles(0).length >
102+
sparkSession.sparkContext.getConf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) {
103+
logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " +
104+
s"rdd partition have to deal with the whole file and consume large time.")
105+
}
106+
}
107+
94108
FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes)
95109
}
96110

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2
1919
import scala.collection.JavaConverters._
2020

2121
import org.apache.hadoop.fs.Path
22-
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
22+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2323

2424
import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2626
import org.apache.spark.sql.types.StructType
2727
import org.apache.spark.sql.util.CaseInsensitiveStringMap
28+
import org.apache.spark.util.Utils
2829

2930
abstract class TextBasedFileScan(
3031
sparkSession: SparkSession,
@@ -33,14 +34,8 @@ abstract class TextBasedFileScan(
3334
readPartitionSchema: StructType,
3435
options: CaseInsensitiveStringMap)
3536
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
36-
private var codecFactory: CompressionCodecFactory = _
37+
@transient private lazy val codecFactory: CompressionCodecFactory = new CompressionCodecFactory(
38+
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))
3739

38-
override def isSplitable(path: Path): Boolean = {
39-
if (codecFactory == null) {
40-
codecFactory = new CompressionCodecFactory(
41-
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))
42-
}
43-
val codec = codecFactory.getCodec(path)
44-
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
45-
}
40+
override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory)
4641
}

0 commit comments

Comments
 (0)