Skip to content

Commit 6006856

Browse files
committed
Fix overflow issues
1 parent 006b4b2 commit 6006856

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import scala.collection.mutable.ArrayBuffer
2828
* occupy a contiguous segment of memory.
2929
*/
3030
private[spark] class ChainedBuffer(chunkSize: Int) {
31-
private val chunkSizeLog2 = (math.log(chunkSize) / math.log(2)).toInt
32-
assert(math.pow(2, chunkSizeLog2).toInt == chunkSize,
31+
32+
private val chunkSizeLog2: Int = java.lang.Long.numberOfTrailingZeros(
33+
java.lang.Long.highestOneBit(chunkSize))
34+
assert((1 << chunkSizeLog2) == chunkSize,
3335
s"ChainedBuffer chunk size $chunkSize must be a power of two")
3436
private val chunks: ArrayBuffer[Array[Byte]] = new ArrayBuffer[Array[Byte]]()
3537
private var _size: Long = 0
@@ -47,7 +49,7 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
4749
s"Read of $len bytes at position $pos would go past size ${_size} of buffer")
4850
}
4951
var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
50-
var posInChunk: Int = (pos - (chunkIndex << chunkSizeLog2)).toInt
52+
var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
5153
var written: Int = 0
5254
while (written < len) {
5355
val toRead: Int = math.min(len - written, chunkSize - posInChunk)
@@ -72,7 +74,7 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
7274
s"Read of $len bytes at position $pos would go past size of buffer")
7375
}
7476
var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
75-
var posInChunk: Int = (pos - (chunkIndex << chunkSizeLog2)).toInt
77+
var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
7678
var written: Int = 0
7779
while (written < len) {
7880
val toRead: Int = math.min(len - written, chunkSize - posInChunk)
@@ -102,9 +104,9 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
102104
chunks += new Array[Byte](chunkSize)
103105
}
104106

105-
var chunkIndex = (pos >> chunkSizeLog2).toInt
106-
var posInChunk = (pos - (chunkIndex << chunkSizeLog2)).toInt
107-
var written = 0
107+
var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
108+
var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
109+
var written: Int = 0
108110
while (written < len) {
109111
val toWrite: Int = math.min(len - written, chunkSize - posInChunk)
110112
System.arraycopy(bytes, offs + written, chunks(chunkIndex), posInChunk, toWrite)
@@ -119,7 +121,7 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
119121
/**
120122
* Total size of buffer that can be written to without allocating additional memory.
121123
*/
122-
def capacity: Long = chunks.size * chunkSize
124+
def capacity: Long = chunks.size.toLong * chunkSize
123125

124126
/**
125127
* Size of the logical buffer.

core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,6 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
7575
}
7676

7777
val keyStart = kvBuffer.size
78-
if (keyStart < 0) {
79-
throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes")
80-
}
8178
kvSerializationStream.writeKey[Any](key)
8279
kvSerializationStream.writeValue[Any](value)
8380
kvSerializationStream.flush()

0 commit comments

Comments
 (0)