-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value #21175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value #21175
Conversation
Would it be possible to add a unit test? |
In class ChunkedByteBufferSuite test("writeFully() does not affect original buffer's position") { |
@Manbuyun you need to add the unit test into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better with a unit test.
Plz add |
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { | |||
assert(chunkedByteBuffer.getChunks().head.position() === 0) | |||
} | |||
|
|||
test("writeFully() does not affect original buffer's position") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Manbuyun .You should add a new unit test to support your own change. For example, "writeFully() can write buffer which is larger than bufferWriteChunkSize
correctly. " And update the test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { | ||
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) | ||
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) | ||
assert(chunkedByteBuffer.getChunks().head.position() === 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert is unnecessary for this PR change. Please replace it with assert channel's length here.
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { | |||
assert(chunkedByteBuffer.getChunks().head.position() === 0) | |||
} | |||
|
|||
test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { | |||
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space beside *
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { | |||
assert(chunkedByteBuffer.getChunks().head.position() === 0) | |||
} | |||
|
|||
test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would it be possible to add SPARK-24107:
into the start of the string? It would help us connect a UT with JIRA entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { | ||
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) | ||
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) | ||
assert(chunkedByteBuffer.size === (80L * 1024L * 1024L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteArrayWritableChannel
's size, not chunkedByteBuffer
's size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake, has been fixed. Thanks
@@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { | |||
*/ | |||
def writeFully(channel: WritableByteChannel): Unit = { | |||
for (bytes <- getChunks()) { | |||
val limit = bytes.limit() | |||
while (bytes.remaining() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to this pr though, while (bytes.hasRemaining) {
?
@@ -56,6 +56,13 @@ class ChunkedByteBufferSuite extends SparkFunSuite { | |||
assert(chunkedByteBuffer.getChunks().head.position() === 0) | |||
} | |||
|
|||
test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { | |||
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you configure bufferWriteChunkSize
explicitly for this test purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified.Please check
@@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { | |||
*/ | |||
def writeFully(channel: WritableByteChannel): Unit = { | |||
for (bytes <- getChunks()) { | |||
val limit = bytes.limit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming limit
to curChunkLimit
?
@@ -56,6 +56,15 @@ class ChunkedByteBufferSuite extends SparkFunSuite { | |||
assert(chunkedByteBuffer.getChunks().head.position() === 0) | |||
} | |||
|
|||
test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { | |||
val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE)) | |||
.getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about setting this value via spark.buffer.write.chunkSize
? e.g.,
sc.conf.set("spark.default.parallelism", "4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I have added. Please check
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) | ||
bytes.limit(bytes.position() + ioSize) | ||
channel.write(bytes) | ||
bytes.limit(curChunkLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rewrite this using:
try {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
} finally {
bytes.limit(curChunkLimit)
}
to be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. When channel write throw IOException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have commit this modified
ok to test |
Test build #89928 has finished for PR 21175 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit for style check
@@ -20,12 +20,12 @@ package org.apache.spark.io | |||
import java.nio.ByteBuffer | |||
|
|||
import com.google.common.io.ByteStreams | |||
|
|||
import org.apache.spark.SparkFunSuite | |||
import org.apache.spark.{SparkFunSuite, SharedSparkContext} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move SharedSparkContext before SparkFunSuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed and commit. Thanks
@@ -20,12 +20,12 @@ package org.apache.spark.io | |||
import java.nio.ByteBuffer | |||
|
|||
import com.google.common.io.ByteStreams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an empty line behind 22 to separate spark and third-party group.
Test build #90035 has finished for PR 21175 at commit
|
the R test is a known issue, I'm merging in to master and 2.3, thanks! |
Hi, All. |
… the limit value JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22 ChunkedByteBuffer.writeFully method has not reset the limit value. When chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte Author: WangJinhai02 <jinhai.wang02@ele.me> Closes #21175 from manbuyun/bugfix-ChunkedByteBuffer. (cherry picked from commit 152eaf6) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) | ||
bytes.limit(bytes.position() + ioSize) | ||
channel.write(bytes) | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the try
and finally
here because getChunks()
returns duplicated ByteBuffers which have their own position and limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is, bytes.limit(bytes.position() + ioSize)
will change the result of bytes.hasRemaining
, so we have to restore the limit in each loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point. if there is an exception, there is no next loop and we don't need to restore the limit. so try finally is not needed
while (bytes.hasRemaining) { | ||
try { | ||
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) | ||
bytes.limit(bytes.position() + ioSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale for the limit()
isn't super-clear, but that was a problem in the original PR which introduced the bug (#18730). I'm commenting here only for cross-reference reference for folks who come across this patch in the future. I believe that the original motivation was http://www.evanjones.ca/java-bytebuffer-leak.html
No, I mean that the code here can simply follow the write call as straight
through code. We don't need to guard against exceptions here because the
duplicate of the buffer is used only by a single thread, so you can omit
the try block and just concatenate the try contents to the finally
contents. Minor bit but I wanted to comment because I initially was
confused about when errors could occur and thread safety / sharing until I
realized that the modified state does not escape this method.
…On Mon, May 14, 2018 at 9:03 PM Wenchen Fan ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
<#21175 (comment)>:
> @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
- while (bytes.remaining() > 0) {
- val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
- bytes.limit(bytes.position() + ioSize)
- channel.write(bytes)
+ val curChunkLimit = bytes.limit()
+ while (bytes.hasRemaining) {
+ try {
+ val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+ bytes.limit(bytes.position() + ioSize)
+ channel.write(bytes)
+ } finally {
Do you mean this is not a real bug that can cause real workload to fail?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#21175 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AADGPJvZNC5LYjHl2WZ44YEIBVGLrehEks5tylODgaJpZM4TptO_>
.
|
… not reset the limit value ## What changes were proposed in this pull request? According to the discussion in apache#21175 , this PR proposes 2 improvements: 1. add comments to explain why we call `limit` to write out `ByteBuffer` with slices. 2. remove the `try ... finally` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21327 from cloud-fan/minor.
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22
ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte