Skip to content

[SPARK-23029] [DOCS] Specifying default units of configuration entries #20269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,9 @@ private[spark] object SparkConf extends Logging {
translation = s => s"${s.toLong * 10}s")),
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
Expand Down
47 changes: 27 additions & 20 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ package object config {
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)

private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.doc("Amount of memory to use for the driver process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per driver in cluster mode, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @ferdonline , can you explain why this is off-heap memory ?

"in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createOptional

Expand All @@ -62,6 +65,7 @@ package object config {
.createWithDefault(false)

private[spark] val EVENT_LOG_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.eventLog.buffer.kb")
.doc("Buffer size to use when writing to output streams, in KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("100k")

Expand All @@ -81,10 +85,13 @@ package object config {
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)

private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.doc("Amount of memory to use per executor process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per executor in cluster mode, " +
"in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createOptional

Expand Down Expand Up @@ -353,7 +360,7 @@ package object config {
private[spark] val BUFFER_WRITE_CHUNK_SIZE =
ConfigBuilder("spark.buffer.write.chunkSize")
.internal()
.doc("The chunk size during writing out the bytes of ChunkedByteBuffer.")
.doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue.")
Expand All @@ -368,9 +375,9 @@ package object config {

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config. This helps to prevent OOM by " +
"avoiding underestimating shuffle block size when fetch shuffle blocks.")
.doc("Threshold in bytes above which the size of shuffle blocks in " +
"HighlyCompressedMapStatus is accurately recorded. This helps to prevent OOM " +
"by avoiding underestimating shuffle block size when fetch shuffle blocks.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)

Expand All @@ -389,23 +396,23 @@ package object config {

private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
.doc("This configuration limits the number of remote blocks being fetched per reduce task" +
" from a given host port. When a large number of blocks are being requested from a given" +
" address in a single fetch or simultaneously, this could crash the serving executor or" +
" Node Manager. This is especially useful to reduce the load on the Node Manager when" +
" external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.doc("This configuration limits the number of remote blocks being fetched per reduce task " +
"from a given host port. When a large number of blocks are being requested from a given " +
"address in a single fetch or simultaneously, this could crash the serving executor or " +
"Node Manager. This is especially useful to reduce the load on the Node Manager when " +
"external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.intConf
.checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
.createWithDefault(Int.MaxValue)

private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
.doc("Remote block will be fetched to disk when size of the block is " +
"above this threshold. This is to avoid a giant request takes too much memory. We can " +
"enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
"affect both shuffle fetch and block manager remote block fetch. For users who " +
"enabled external shuffle service, this feature can only be worked when external shuffle" +
" service is newer than Spark 2.2.")
.doc("Remote block will be fetched to disk when size of the block is above this threshold " +
"in bytes. This is to avoid a giant request takes too much memory. We can enable this " +
"config by setting a specific value(e.g. 200m). Note this configuration will affect " +
"both shuffle fetch and block manager remote block fetch. For users who enabled " +
"external shuffle service, this feature can only be worked when external shuffle" +
"service is newer than Spark 2.2.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

Expand All @@ -419,9 +426,9 @@ package object config {

private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.file.buffer")
.doc("Size of the in-memory buffer for each shuffle file output stream. " +
"These buffers reduce the number of disk seeks and system calls made " +
"in creating intermediate shuffle files.")
.doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
"otherwise specified. These buffers reduce the number of disk seeks and system calls " +
"made in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
Expand All @@ -430,15 +437,15 @@ package object config {
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer.")
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size to use when writing the sorted records to an on-disk file.")
.doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
Expand Down
Loading