Skip to content

Commit 5c1f201

Browse files
committed
fix
1 parent 692e3dd commit 5c1f201

File tree

8 files changed

+11
-10
lines changed

8 files changed

+11
-10
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,8 +683,9 @@ private[spark] object SparkConf extends Logging {
683683
AlternateConfig("spark.akka.frameSize", "1.6")),
684684
"spark.yarn.jars" -> Seq(
685685
AlternateConfig("spark.yarn.jar", "2.0")),
686-
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
687-
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
686+
NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
687+
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"),
688+
AlternateConfig("spark.maxRemoteBlockSizeFetchToMem", "3.0")),
688689
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
689690
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
690691
DRIVER_MEMORY_OVERHEAD.key -> Seq(

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -894,8 +894,8 @@ package object config {
894894
.checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
895895
.createWithDefault(Int.MaxValue)
896896

897-
private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
898-
ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
897+
private[spark] val NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
898+
ConfigBuilder("spark.network.maxRemoteBlockSizeFetchToMem")
899899
.doc("Remote block will be fetched to disk when size of the block is above this threshold " +
900900
"in bytes. This is to avoid a giant request takes too much memory. Note this " +
901901
"configuration will affect both shuffle fetch and block manager remote block fetch. " +

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private[spark] class NettyBlockTransferService(
168168
// Everything else is encoded using our binary protocol.
169169
val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
170170

171-
val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
171+
val asStream = blockData.size() > conf.get(config.NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
172172
val callback = new RpcResponseCallback {
173173
override def onSuccess(response: ByteBuffer): Unit = {
174174
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
7676
SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
7777
SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
7878
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
79-
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
79+
SparkEnv.get.conf.get(config.NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
8080
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
8181
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT_MEMORY),
8282
readMetrics,

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private[spark] class BlockManager(
246246
// Exposed for test
247247
private[storage] val remoteBlockTempFileManager =
248248
new BlockManager.RemoteBlockDownloadFileManager(this)
249-
private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
249+
private val maxRemoteBlockToMem = conf.get(config.NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
250250

251251
var hostLocalDirManager: Option[HostLocalDirManager] = None
252252

core/src/test/scala/org/apache/spark/DistributedSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
165165
// also try with block replication as a stream
166166
val uploadStreamConf = new SparkConf()
167167
uploadStreamConf.setAll(conf.getAll)
168-
uploadStreamConf.set(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1L)
168+
uploadStreamConf.set(config.NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1L)
169169
test(s"$testName (with replication as stream)") {
170170
testCaching(uploadStreamConf, storageLevel)
171171
}

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1658,7 +1658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
16581658
}
16591659

16601660
test("fetch remote block to local disk if block size is larger than threshold") {
1661-
conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1000L)
1661+
conf.set(NETWORK_MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1000L)
16621662

16631663
val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
16641664
val mockBlockTransferService = new MockBlockTransferService(0)

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ Apart from these, the following properties are also available, and may be useful
18101810
</td>
18111811
</tr>
18121812
<tr>
1813-
<td><code>spark.maxRemoteBlockSizeFetchToMem</code></td>
1813+
<td><code>spark.network.maxRemoteBlockSizeFetchToMem</code></td>
18141814
<td>200m</td>
18151815
<td>
18161816
Remote block will be fetched to disk when size of the block is above this threshold

0 commit comments

Comments
 (0)