-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-3121] Wrong implementation of implicit bytesWritableConverter #2712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
Jenkins, please start the test. |
QA tests have started for PR 2712 at commit
|
Great catch. A concern is that calling Array#take requires an implicit conversion, which has some performance impact that might be unacceptable for this method that can get called in a tight loop. http://villane.wordpress.com/2008/02/02/learning-scala-performance-impact-of-implicit-conversions/ |
QA tests have finished for PR 2712 at commit
|
Originaly I wanted to just replace getBytes method with copyBytes. It is available in newer versions of api but I found an older version is imported in spark. I am not very familiar with what hadoop api is used in spark yet. So do you suggest to implement it without usage of take method? |
Hmm, yeah, copyBytes is no good if it doesn't appear in Hadoop 1. My suggestion would be to use from copyOfRange from java.util.Arrays. |
I pushed the new version. I guess jenkins test will kick out automatically right? |
Jenkins, test this please. |
Can one of the admins verify this patch? |
Jenkins, test this please. |
Jenkins, this is ok to test. |
QA tests have started for PR 2712 at commit
|
QA tests have finished for PR 2712 at commit
|
Test FAILed. |
@@ -1409,7 +1411,7 @@ object SparkContext extends Logging { | |||
simpleWritableConverter[Boolean, BooleanWritable](_.get) | |||
|
|||
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { | |||
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) | |||
simpleWritableConverter[Array[Byte], BytesWritable](bw => Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this goes past 100 characters
QA tests have started for PR 2712 at commit
|
QA tests have finished for PR 2712 at commit
|
Test FAILed. |
It's failing at FlumeStreamSuite.scala:109 which seems to be unrelated to this patch. |
One more nit: the added java import should go with the other java imports. |
Otherwise, LGTM |
Can it be that test Flume test failed due to upstream changes? It is passing for me locally now. |
QA tests have started for PR 2712 at commit
|
QA tests have finished for PR 2712 at commit
|
Test PASSed. |
That particular Flume test is known to be flaky; I think that TD is working on a rewrite / fix for that test suite. |
import org.apache.hadoop.io.BytesWritable | ||
|
||
class SparkContextSuite extends FunSuite { | ||
test("test of writing spark scala test") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test could use a better name. I'd also add a comment, like // Regression test for SPARK-3121
to help readers link this back to the JIRA.
Actually, ignore my earlier (deleted) comments; this looks like a valid issue (see HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to programming mistakes"). |
@@ -1409,7 +1410,9 @@ object SparkContext extends Logging { | |||
simpleWritableConverter[Boolean, BooleanWritable](_.get) | |||
|
|||
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { | |||
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) | |||
simpleWritableConverter[Array[Byte], BytesWritable](bw => | |||
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a one-line comment here that explains why we need to make this copy?
This looks good to me; sorry for my earlier confusion. If you add a comment and change the name of the test, I'll merge this and cherry-pick it back into |
Sorry for the test name. Now it should be all fine including commets. |
QA tests have started for PR 2712 at commit
|
QA tests have finished for PR 2712 at commit
|
Test PASSed. |
This looks great; thanks for adding the comments. I'm going to merge this into master and backport it to |
val path = ... //path to seq file with BytesWritable as type of both key and value val file = sc.sequenceFile[Array[Byte],Array[Byte]](path) file.take(1)(0)._1 This prints incorrect content of byte array. Actual content starts with correct one and some "random" bytes and zeros are appended. BytesWritable has two methods: getBytes() - return content of all internal array which is often longer then actual value stored. It usually contains the rest of previous longer values copyBytes() - return just begining of internal array determined by internal length property It looks like in implicit conversion between BytesWritable and Array[byte] getBytes is used instead of correct copyBytes. dbtsai Author: Jakub Dubovský <james64@inMail.sk> Author: Dubovsky Jakub <dubovsky@avast.com> Closes #2712 from james64/3121-bugfix and squashes the following commits: f85d24c [Jakub Dubovský] Test name changed, comments added 1b20d51 [Jakub Dubovský] Import placed correctly 406e26c [Jakub Dubovský] Scala style fixed f92ffa6 [Dubovsky Jakub] performance tuning 480f9cd [Dubovsky Jakub] Bug 3121 fixed (cherry picked from commit fc616d5) Signed-off-by: Josh Rosen <joshrosen@apache.org>
val path = ... //path to seq file with BytesWritable as type of both key and value val file = sc.sequenceFile[Array[Byte],Array[Byte]](path) file.take(1)(0)._1 This prints incorrect content of byte array. Actual content starts with correct one and some "random" bytes and zeros are appended. BytesWritable has two methods: getBytes() - return content of all internal array which is often longer then actual value stored. It usually contains the rest of previous longer values copyBytes() - return just begining of internal array determined by internal length property It looks like in implicit conversion between BytesWritable and Array[byte] getBytes is used instead of correct copyBytes. dbtsai Author: Jakub Dubovský <james64@inMail.sk> Author: Dubovsky Jakub <dubovsky@avast.com> Closes #2712 from james64/3121-bugfix and squashes the following commits: f85d24c [Jakub Dubovský] Test name changed, comments added 1b20d51 [Jakub Dubovský] Import placed correctly 406e26c [Jakub Dubovský] Scala style fixed f92ffa6 [Dubovsky Jakub] performance tuning 480f9cd [Dubovsky Jakub] Bug 3121 fixed (cherry picked from commit fc616d5) Signed-off-by: Josh Rosen <joshrosen@apache.org>
val path = ... //path to seq file with BytesWritable as type of both key and value
val file = sc.sequenceFileArray[Byte],Array[Byte]
file.take(1)(0)._1
This prints incorrect content of byte array. Actual content starts with correct one and some "random" bytes and zeros are appended. BytesWritable has two methods:
getBytes() - return content of all internal array which is often longer then actual value stored. It usually contains the rest of previous longer values
copyBytes() - return just begining of internal array determined by internal length property
It looks like in implicit conversion between BytesWritable and Array[byte] getBytes is used instead of correct copyBytes.
@dbtsai