Skip to content

Commit 601a83f

Browse files
AnnaSavarinWilliam Montaz
authored andcommitted
Merge pull request apache#77 from vincent-grosbois/criteo-2.2
[SPARK-24917] make chunk size configurable
1 parent 1197ed8 commit 601a83f

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ private[spark] class SerializerManager(
7070
private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
7171
// Whether to compress shuffle output temporarily spilled to disk
7272
private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
73+
// Size of the chunks to be used in the ChunkedByteBuffer
74+
private[this] val chunkSizeMb = conf.getSizeAsMb("spark.memory.chunkSize", "4m").toInt
7375

7476
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
7577
* the initialization of the compression codec until it is first used. The reason is that a Spark
@@ -186,7 +188,7 @@ private[spark] class SerializerManager(
186188
blockId: BlockId,
187189
values: Iterator[_],
188190
classTag: ClassTag[_]): ChunkedByteBuffer = {
189-
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
191+
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * chunkSizeMb, ByteBuffer.allocate)
190192
val byteStream = new BufferedOutputStream(bbos)
191193
val autoPick = !blockId.isInstanceOf[StreamBlockId]
192194
val ser = getSerializer(classTag, autoPick).newInstance()

0 commit comments

Comments
 (0)