Skip to content

[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value #21175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed

[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value #21175

wants to merge 11 commits into from

Conversation

jinhai-cloud
Copy link
Contributor

@jinhai-cloud jinhai-cloud commented Apr 27, 2018

JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22

ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte

@kiszk
Copy link
Member

kiszk commented Apr 27, 2018

Would it be possible to add a unit test?

@jinhai-cloud
Copy link
Contributor Author

jinhai-cloud commented Apr 27, 2018

In class ChunkedByteBufferSuite
unit test:

test("writeFully() does not affect original buffer's position") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

@Ngone51
Copy link
Member

Ngone51 commented Apr 27, 2018

@Manbuyun you need to add the unit test into ChunkedByteBufferSuite.scala and push a new commit.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better with a unit test.

@maropu
Copy link
Member

maropu commented Apr 27, 2018

Plz add [CORE] in the title.

@jinhai-cloud jinhai-cloud changed the title [SPARK-24107] ChunkedByteBuffer.writeFully method has not reset the limit value [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value Apr 27, 2018
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("writeFully() does not affect original buffer's position") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Manbuyun .You should add a new unit test to support your own change. For example, "writeFully() can write buffer which is larger than bufferWriteChunkSize correctly. " And update the test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
assert(chunkedByteBuffer.getChunks().head.position() === 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is unnecessary for this PR change. Please replace it with assert channel's length here.

@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space beside *.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would it be possible to add SPARK-24107: into the start of the string? It would help us connect a UT with JIRA entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
assert(chunkedByteBuffer.size === (80L * 1024L * 1024L))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteArrayWritableChannel 's size, not chunkedByteBuffer's size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, has been fixed. Thanks

@Ngone51
Copy link
Member

Ngone51 commented Apr 27, 2018

@@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
val limit = bytes.limit()
while (bytes.remaining() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this pr though, while (bytes.hasRemaining) {?

@@ -56,6 +56,13 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
Copy link
Member

@maropu maropu Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you configure bufferWriteChunkSize explicitly for this test purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified.Please check

@@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
val limit = bytes.limit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming limit to curChunkLimit?

@@ -56,6 +56,15 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
.getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
Copy link
Member

@maropu maropu Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about setting this value via spark.buffer.write.chunkSize? e.g.,

sc.conf.set("spark.default.parallelism", "4")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I have added. Please check

val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
bytes.limit(curChunkLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rewrite this using:

try {
	val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
	bytes.limit(bytes.position() + ioSize)
	channel.write(bytes)
} finally {
	bytes.limit(curChunkLimit)
}

to be safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. When channel write throw IOException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have commit this modified

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Apr 27, 2018

Test build #89928 has finished for PR 21175 at commit fb527c8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for style check

@@ -20,12 +20,12 @@ package org.apache.spark.io
import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SharedSparkContext}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move SharedSparkContext before SparkFunSuite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed and commit. Thanks

@@ -20,12 +20,12 @@ package org.apache.spark.io
import java.nio.ByteBuffer

import com.google.common.io.ByteStreams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty line behind 22 to separate spark and third-party group.

@SparkQA
Copy link

SparkQA commented May 2, 2018

Test build #90035 has finished for PR 21175 at commit e78ef39.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 152eaf6 May 2, 2018
@cloud-fan
Copy link
Contributor

the R test is a known issue, I'm merging in to master and 2.3, thanks!

@dongjoon-hyun
Copy link
Member

Hi, All.
I created SPARK-24152 because we start to merge by ignoring that known unknown SparkR failure.

asfgit pushed a commit that referenced this pull request May 2, 2018
… the limit value

JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22

ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte

Author: WangJinhai02 <jinhai.wang02@ele.me>

Closes #21175 from manbuyun/bugfix-ChunkedByteBuffer.

(cherry picked from commit 152eaf6)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the try and finally here because getChunks() returns duplicated ByteBuffers which have their own position and limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is, bytes.limit(bytes.position() + ioSize) will change the result of bytes.hasRemaining, so we have to restore the limit in each loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point. if there is an exception, there is no next loop and we don't need to restore the limit. so try finally is not needed

while (bytes.hasRemaining) {
try {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale for the limit() isn't super-clear, but that was a problem in the original PR which introduced the bug (#18730). I'm commenting here only for cross-reference reference for folks who come across this patch in the future. I believe that the original motivation was http://www.evanjones.ca/java-bytebuffer-leak.html

@JoshRosen
Copy link
Contributor

JoshRosen commented May 15, 2018 via email

ghost pushed a commit to dbtsai/spark that referenced this pull request May 17, 2018
… not reset the limit value

## What changes were proposed in this pull request?

According to the discussion in apache#21175 , this PR proposes 2 improvements:
1. add comments to explain why we call `limit` to write out `ByteBuffer` with slices.
2. remove the `try ... finally`

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21327 from cloud-fan/minor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants