Skip to content

Commit 00074b5

Browse files
committed
[SPARK-19062] Utils.writeByteBuffer bug fix
This commit changes Utils.writeByteBuffer so that it does not change the position of the ByteBuffer that it writes out, and adds a unit test for this functionality. cc mridulm Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16462 from kayousterhout/SPARK-19062.
1 parent 4262fb0 commit 00074b5

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,11 @@ private[spark] object Utils extends Logging {
237237
if (bb.hasArray) {
238238
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
239239
} else {
240+
val originalPosition = bb.position()
240241
val bbval = new Array[Byte](bb.remaining())
241242
bb.get(bbval)
242243
out.write(bbval)
244+
bb.position(originalPosition)
243245
}
244246
}
245247

@@ -250,9 +252,11 @@ private[spark] object Utils extends Logging {
250252
if (bb.hasArray) {
251253
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
252254
} else {
255+
val originalPosition = bb.position()
253256
val bbval = new Array[Byte](bb.remaining())
254257
bb.get(bbval)
255258
out.write(bbval)
259+
bb.position(originalPosition)
256260
}
257261
}
258262

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, PrintStream}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File,
21+
FileOutputStream, PrintStream}
2122
import java.lang.{Double => JDouble, Float => JFloat}
2223
import java.net.{BindException, ServerSocket, URI}
2324
import java.nio.{ByteBuffer, ByteOrder}
@@ -389,6 +390,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
389390
assert(Utils.deserializeLongValue(bbuf.array) === testval)
390391
}
391392

393+
test("writeByteBuffer should not change ByteBuffer position") {
394+
// Test a buffer with an underlying array, for both writeByteBuffer methods.
395+
val testBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
396+
assert(testBuffer.hasArray)
397+
val bytesOut = new ByteBufferOutputStream(4096)
398+
Utils.writeByteBuffer(testBuffer, bytesOut)
399+
assert(testBuffer.position() === 0)
400+
401+
val dataOut = new DataOutputStream(bytesOut)
402+
Utils.writeByteBuffer(testBuffer, dataOut: DataOutput)
403+
assert(testBuffer.position() === 0)
404+
405+
// Test a buffer without an underlying array, for both writeByteBuffer methods.
406+
val testDirectBuffer = ByteBuffer.allocateDirect(8)
407+
assert(!testDirectBuffer.hasArray())
408+
Utils.writeByteBuffer(testDirectBuffer, bytesOut)
409+
assert(testDirectBuffer.position() === 0)
410+
411+
Utils.writeByteBuffer(testDirectBuffer, dataOut: DataOutput)
412+
assert(testDirectBuffer.position() === 0)
413+
}
414+
392415
test("get iterator size") {
393416
val empty = Seq[Int]()
394417
assert(Utils.getIteratorSize(empty.toIterator) === 0L)

0 commit comments

Comments
 (0)