Skip to content

[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value #21175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
while (bytes.remaining() > 0) {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
val curChunkLimit = bytes.limit()
while (bytes.hasRemaining) {
try {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale for the limit() isn't super-clear, but that was a problem in the original PR which introduced the bug (#18730). I'm commenting here only for cross-reference reference for folks who come across this patch in the future. I believe that the original motivation was http://www.evanjones.ca/java-bytebuffer-leak.html

channel.write(bytes)
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the try and finally here because getChunks() returns duplicated ByteBuffers which have their own position and limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is, bytes.limit(bytes.position() + ioSize) will change the result of bytes.hasRemaining, so we have to restore the limit in each loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point. if there is an exception, there is no next loop and we don't need to restore the limit. so try finally is not needed

bytes.limit(curChunkLimit)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import java.nio.ByteBuffer

import com.google.common.io.ByteStreams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty line behind 22 to separate spark and third-party group.


import org.apache.spark.SparkFunSuite
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.util.io.ChunkedByteBuffer

class ChunkedByteBufferSuite extends SparkFunSuite {
class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {

test("no chunks") {
val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
Expand Down Expand Up @@ -56,6 +57,18 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
try {
sc.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 32L * 1024L * 1024L)
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(40 * 1024 * 1024)))
val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)
chunkedByteBuffer.writeFully(byteArrayWritableChannel)
assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size)
} finally {
sc.conf.remove(config.BUFFER_WRITE_CHUNK_SIZE)
}
}

test("toArray()") {
val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
Expand Down