Skip to content

Commit 26d03b6

Browse files
WeichenXu123cloud-fan
authored andcommitted
[SPARK-28366][CORE] Logging in driver when loading single large unsplittable file
## What changes were proposed in this pull request? Logging in driver when loading single large unsplittable file via `sc.textFile` or csv/json datasouce. Current condition triggering logging is * only generate one partition * file is unsplittable, possible reason is: - compressed by unsplittable compression algo such as gzip. - multiLine mode in csv/json datasource - wholeText mode in text datasource * file size exceed the config threshold `spark.io.warning.largeFileThreshold` (default value is 1GB) ## How was this patch tested? Manually test. Generate one gzip file exceeding 1GB, ``` base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt cat file1.txt | gzip > file1.gz ``` then launch spark-shell, run ``` sc.textFile("file:///path/to/file1.gz").count() ``` Will print log like: ``` WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec ``` run ``` sc.textFile("file:///path/to/file1.txt").count() ``` Will print log like: ``` WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile ``` run ``` spark.read.csv("file:///path/to/file1.gz").count ``` Will print log like: ``` WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec ``` run ``` spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count ``` Will print log like: ``` WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode ``` JSON and Text datasource also tested with similar cases. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #25134 from WeichenXu123/log_gz. Authored-by: WeichenXu <weichen.xu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent ee41001 commit 26d03b6

File tree

8 files changed

+93
-14
lines changed

8 files changed

+93
-14
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,14 @@ package object config {
12041204
.intConf
12051205
.createWithDefault(1)
12061206

1207+
private[spark] val IO_WARNING_LARGEFILETHRESHOLD =
1208+
ConfigBuilder("spark.io.warning.largeFileThreshold")
1209+
.internal()
1210+
.doc("When spark loading one single large file, if file size exceed this " +
1211+
"threshold, then log warning with possible reasons.")
1212+
.longConf
1213+
.createWithDefault(1024 * 1024 * 1024)
1214+
12071215
private[spark] val EVENT_LOG_COMPRESSION_CODEC =
12081216
ConfigBuilder("spark.eventLog.compression.codec")
12091217
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.immutable.Map
2525
import scala.reflect.ClassTag
2626

2727
import org.apache.hadoop.conf.{Configurable, Configuration}
28+
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
2829
import org.apache.hadoop.mapred._
2930
import org.apache.hadoop.mapred.lib.CombineFileSplit
3031
import org.apache.hadoop.mapreduce.TaskType
@@ -40,7 +41,7 @@ import org.apache.spark.internal.config._
4041
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4142
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
4243
import org.apache.spark.storage.StorageLevel
43-
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}
44+
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
4445

4546
/**
4647
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -207,6 +208,21 @@ class HadoopRDD[K, V](
207208
} else {
208209
allInputSplits
209210
}
211+
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
212+
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
213+
val path = fileSplit.getPath
214+
if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
215+
val codecFactory = new CompressionCodecFactory(jobConf)
216+
if (Utils.isFileSplittable(path, codecFactory)) {
217+
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
218+
s"we can increase partition numbers by the `minPartitions` argument in method " +
219+
"`sc.textFile`")
220+
} else {
221+
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
222+
s"partition, because the file is compressed by unsplittable compression codec.")
223+
}
224+
}
225+
}
210226
val array = new Array[Partition](inputSplits.size)
211227
for (i <- 0 until inputSplits.size) {
212228
array(i) = new HadoopPartition(id, i, inputSplits(i))

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import com.google.common.net.InetAddresses
5151
import org.apache.commons.lang3.SystemUtils
5252
import org.apache.hadoop.conf.Configuration
5353
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
54+
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
5455
import org.apache.hadoop.security.UserGroupInformation
5556
import org.apache.hadoop.yarn.conf.YarnConfiguration
5657
import org.eclipse.jetty.util.MultiException
@@ -2901,6 +2902,12 @@ private[spark] object Utils extends Logging {
29012902
def isLocalUri(uri: String): Boolean = {
29022903
uri.startsWith(s"$LOCAL_SCHEME:")
29032904
}
2905+
2906+
/** Check whether the file of the path is splittable. */
2907+
def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = {
2908+
val codec = codecFactory.getCodec(path)
2909+
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
2910+
}
29042911
}
29052912

29062913
private[util] object CallerContext extends Logging {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,40 @@ import java.util.{Locale, OptionalLong}
2121
import org.apache.commons.lang3.StringUtils
2222
import org.apache.hadoop.fs.Path
2323

24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD
2426
import org.apache.spark.sql.{AnalysisException, SparkSession}
25-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
2627
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
2728
import org.apache.spark.sql.execution.PartitionedFileUtil
2829
import org.apache.spark.sql.execution.datasources._
2930
import org.apache.spark.sql.sources.Filter
3031
import org.apache.spark.sql.sources.v2.reader._
3132
import org.apache.spark.sql.types.StructType
32-
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3333
import org.apache.spark.util.Utils
3434

3535
abstract class FileScan(
3636
sparkSession: SparkSession,
3737
fileIndex: PartitioningAwareFileIndex,
3838
readDataSchema: StructType,
39-
readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics {
39+
readPartitionSchema: StructType)
40+
extends Scan
41+
with Batch with SupportsReportStatistics with Logging {
4042
/**
4143
* Returns whether a file with `path` could be split or not.
4244
*/
4345
def isSplitable(path: Path): Boolean = {
4446
false
4547
}
4648

49+
/**
50+
* If a file with `path` is unsplittable, return the unsplittable reason,
51+
* otherwise return `None`.
52+
*/
53+
def getFileUnSplittableReason(path: Path): String = {
54+
assert(!isSplitable(path))
55+
"undefined"
56+
}
57+
4758
override def description(): String = {
4859
val locationDesc =
4960
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
@@ -91,6 +102,16 @@ abstract class FileScan(
91102
)
92103
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
93104
}
105+
106+
if (splitFiles.length == 1) {
107+
val path = new Path(splitFiles(0).filePath)
108+
if (!isSplitable(path) && splitFiles(0).length >
109+
sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
110+
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
111+
s"partition, the reason is: ${getFileUnSplittableReason(path)}")
112+
}
113+
}
114+
94115
FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes)
95116
}
96117

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2
1919
import scala.collection.JavaConverters._
2020

2121
import org.apache.hadoop.fs.Path
22-
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
22+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2323

2424
import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2626
import org.apache.spark.sql.types.StructType
2727
import org.apache.spark.sql.util.CaseInsensitiveStringMap
28+
import org.apache.spark.util.Utils
2829

2930
abstract class TextBasedFileScan(
3031
sparkSession: SparkSession,
@@ -33,14 +34,13 @@ abstract class TextBasedFileScan(
3334
readPartitionSchema: StructType,
3435
options: CaseInsensitiveStringMap)
3536
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
36-
private var codecFactory: CompressionCodecFactory = _
37+
@transient private lazy val codecFactory: CompressionCodecFactory = new CompressionCodecFactory(
38+
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))
3739

38-
override def isSplitable(path: Path): Boolean = {
39-
if (codecFactory == null) {
40-
codecFactory = new CompressionCodecFactory(
41-
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))
42-
}
43-
val codec = codecFactory.getCodec(path)
44-
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
40+
override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory)
41+
42+
override def getFileUnSplittableReason(path: Path): String = {
43+
assert(!isSplitable(path))
44+
"the file is compressed by unsplittable compression codec"
4545
}
4646
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
2424
import org.apache.spark.sql.catalyst.csv.CSVOptions
2525
import org.apache.spark.sql.catalyst.expressions.ExprUtils
2626
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
27-
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
27+
import org.apache.spark.sql.execution.datasources.csv.{CSVDataSource, MultiLineCSVDataSource}
2828
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
2929
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
3030
import org.apache.spark.sql.types.{DataType, StructType}
@@ -50,6 +50,15 @@ case class CSVScan(
5050
CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path)
5151
}
5252

53+
override def getFileUnSplittableReason(path: Path): String = {
54+
assert(!isSplitable(path))
55+
if (!super.isSplitable(path)) {
56+
super.getFileUnSplittableReason(path)
57+
} else {
58+
"the csv datasource is set multiLine mode"
59+
}
60+
}
61+
5362
override def createReaderFactory(): PartitionReaderFactory = {
5463
// Check a field requirement for corrupt records here to throw an exception in a driver side
5564
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ case class JsonScan(
5050
JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path)
5151
}
5252

53+
override def getFileUnSplittableReason(path: Path): String = {
54+
assert(!isSplitable(path))
55+
if (!super.isSplitable(path)) {
56+
super.getFileUnSplittableReason(path)
57+
} else {
58+
"the json datasource is set multiLine mode"
59+
}
60+
}
61+
5362
override def createReaderFactory(): PartitionReaderFactory = {
5463
// Check a field requirement for corrupt records here to throw an exception in a driver side
5564
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ case class TextScan(
4444
super.isSplitable(path) && !textOptions.wholeText
4545
}
4646

47+
override def getFileUnSplittableReason(path: Path): String = {
48+
assert(!isSplitable(path))
49+
if (!super.isSplitable(path)) {
50+
super.getFileUnSplittableReason(path)
51+
} else {
52+
"the text datasource is set wholetext mode"
53+
}
54+
}
55+
4756
override def createReaderFactory(): PartitionReaderFactory = {
4857
assert(
4958
readDataSchema.length <= 1,

0 commit comments

Comments
 (0)