-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes() #15043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes() #15043
Conversation
Test build #65199 has finished for PR 15043 at commit
|
#15056 also touches this code and creates a new test suite for this component so I'd prefer to merge that PR first. Edit: upon further inspection I think these could be merged independently. |
…d-block-values-iterator-bugfix
@@ -782,6 +785,9 @@ private[storage] class PartiallySerializedBlock[T]( | |||
* `close()` on it to free its resources. | |||
*/ | |||
def valuesIterator: PartiallyUnrolledIterator[T] = { | |||
// Close the serialization stream so that the serializer's internal buffers are freed and any | |||
// "end-of-stream" markers can be written out so that `unrolled` is a valid serialized stream. | |||
serializationStream.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like 'unrolled' may basically be invalid until serializationStream is called.
But it looks like valuesIterator is not the only place where unrolled is used
Test build #65341 has finished for PR 15043 at commit
|
Test build #65393 has finished for PR 15043 at commit
|
Jenkins, retest this please. |
Test build #65390 has finished for PR 15043 at commit
|
Test build #65397 has finished for PR 15043 at commit
|
Test build #65407 has finished for PR 15043 at commit
|
} | ||
consumed = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why set consumed = true here ? What's the problem with calling verifyNotConsumedAndNotDiscarded() twice ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was being overly-clever; it's clearer to just set consumed = true
after each of the verifyNotConsumedAndNotDiscarded
call sites.
serializationStream = serializationStream, | ||
redirectableOutputStream, | ||
unrollMemory = unrollMemory, | ||
memoryMode = MemoryMode.ON_HEAP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WHat happens if the memory mode is off_heap. Is that relevant ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should only affect the memory accounting for unroll memory. If you're caching a block at MEMORY_SER
storage level and are using off-heap caching then it's possible for the unrolled memory to be off-heap (so the ChunkedByteBufferOutputStream will be using a DirectBuffer allocator). In this case we need to count this as off-heap unroll memory so that Spark's off-heap allocations can respect the configured off-heap memory limit.
Given that off-heap caching (and thus off-heap unrolling) is a relatively new experimental feature, it's entirely possible that there are accounting bugs within this path. I'm going to try to expand this test suite to also exercise that case just to be 100% sure that we're accounting properly.
new PartiallyUnrolledIterator( | ||
memoryStore, | ||
unrollMemory, | ||
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()), | ||
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, unrolledBuffer.dispose()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change from discard to dispose() ? You've made discard idempotent, right ? Does the caller have to manually release memory after the iterator is consumed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a subtle bug in the old code where the call to dispose()
would end up freeing unroll memory for the buffer but that same memory would also be freed by PartiallyUnrolledIterator
itself. This was exposed by the Mockito verify calls in the new test suite.
Given that unrolledBuffer.toInputStream(dispose = true)
will also handle disposal of the buffer I don't think we even need this CompletionIterator
here. Let me see about removing it.
Mockito.verify(partiallySerializedBlock.invokePrivate(getRedirectableOutputStream())).close() | ||
|
||
val deserializedItems = valuesIterator.toArray.toSeq | ||
Mockito.verify(memoryStore).releaseUnrollMemoryForThisTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the code path that makes releaseUnroll be called, give that the completion task returned by valuesIterator has changed from discard to dispose .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any non-freed unroll memory will be automatically freed at the end of the task (as part of the Executor
or Task
code itself). Before the task has completed, though, PartiallyUnrolledIterator
will free the specified amount of unroll memory once the unrolled iterator is fully consumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the good review feedback. I think that there might indeed be a pre-existing bug related to off-heap unroll accounting here, so let me try to also catch that via strengthened test cases. I'll update this patch to address your feedback.
new PartiallyUnrolledIterator( | ||
memoryStore, | ||
unrollMemory, | ||
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()), | ||
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, unrolledBuffer.dispose()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a subtle bug in the old code where the call to dispose()
would end up freeing unroll memory for the buffer but that same memory would also be freed by PartiallyUnrolledIterator
itself. This was exposed by the Mockito verify calls in the new test suite.
Given that unrolledBuffer.toInputStream(dispose = true)
will also handle disposal of the buffer I don't think we even need this CompletionIterator
here. Let me see about removing it.
serializationStream = serializationStream, | ||
redirectableOutputStream, | ||
unrollMemory = unrollMemory, | ||
memoryMode = MemoryMode.ON_HEAP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should only affect the memory accounting for unroll memory. If you're caching a block at MEMORY_SER
storage level and are using off-heap caching then it's possible for the unrolled memory to be off-heap (so the ChunkedByteBufferOutputStream will be using a DirectBuffer allocator). In this case we need to count this as off-heap unroll memory so that Spark's off-heap allocations can respect the configured off-heap memory limit.
Given that off-heap caching (and thus off-heap unrolling) is a relatively new experimental feature, it's entirely possible that there are accounting bugs within this path. I'm going to try to expand this test suite to also exercise that case just to be 100% sure that we're accounting properly.
Mockito.verify(partiallySerializedBlock.invokePrivate(getRedirectableOutputStream())).close() | ||
|
||
val deserializedItems = valuesIterator.toArray.toSeq | ||
Mockito.verify(memoryStore).releaseUnrollMemoryForThisTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any non-freed unroll memory will be automatically freed at the end of the task (as part of the Executor
or Task
code itself). Before the task has completed, though, PartiallyUnrolledIterator
will free the specified amount of unroll memory once the unrolled iterator is fully consumed.
} | ||
consumed = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was being overly-clever; it's clearer to just set consumed = true
after each of the verifyNotConsumedAndNotDiscarded
call sites.
Alright, I've updated this to address the latest round of review feedback. I did manage to spot a memory-accounting problem with off-heap memory because (Whoops, didn't mean to submit this comment early; my laptop trackpad glitched out and caused a bunch of spurious mouse clicks). |
Test build #65465 has finished for PR 15043 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good. I think a test dimension with memory_mode = off_heap would be useful
} | ||
} | ||
|
||
test("cannot call valuesIterator() after finishWritingToStream()") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: You can probably combine
test("cannot call valuesIterator() more than once") and
test("cannot call finishWritingToStream() after valuesIterator()") into one test
Same for making calls after finishWritingToStream
@@ -33,7 +33,7 @@ class PartiallyUnrolledIteratorSuite extends SparkFunSuite with MockitoSugar { | |||
val rest = (unrollSize until restSize + unrollSize).iterator | |||
|
|||
val memoryStore = mock[MemoryStore] | |||
val joinIterator = new PartiallyUnrolledIterator(memoryStore, unrollSize, unroll, rest) | |||
val joinIterator = new PartiallyUnrolledIterator(memoryStore, ON_HEAP, unrollSize, unroll, rest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should look into trying to test this with OFF_HEAP as well.
Jenkins, retest this please. |
I agree that we should add more off-heap tests, but I'd like to do it in another patch so that we can get this one merged faster to unblock the 2.0.1 RC. In terms of testing off-heap, I think that one of the best high-level tests / asserts would be to strengthen the I'm going to retest this and if it passes tests then I'll merge to master and branch-2.0. I'll add the new tests described above in a followup. |
Test build #65541 has finished for PR 15043 at commit
|
I believe that this latest test failure is caused by a known flaky PySpark test, so I'm going to merge this now and will monitor tests afterwards. |
…utIteratorAsBytes() ## What changes were proposed in this pull request? `MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read. This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer). The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`. ## How was this patch tested? The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug. In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component. Author: Josh Rosen <joshrosen@databricks.com> Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix. (cherry picked from commit 8faa521) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…utIteratorAsBytes() ## What changes were proposed in this pull request? `MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read. This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer). The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`. ## How was this patch tested? The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug. In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix.
What changes were proposed in this pull request?
MemoryStore.putIteratorAsBytes()
may silently lose values when used withKryoSerializer
because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read.This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by @bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer).
The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of
PartiallySerializedBlock
andChunkedByteBufferOutputStream
.How was this patch tested?
The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with
zip
but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug.In addition, I added a new
PartiallySerializedBlockSuite
to unit test that component.