Skip to content

[SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes() #15043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Sep 10, 2016

What changes were proposed in this pull request?

MemoryStore.putIteratorAsBytes() may silently lose values when used with KryoSerializer because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read.

This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by @bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer).

The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of PartiallySerializedBlock and ChunkedByteBufferOutputStream.

How was this patch tested?

The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with zip but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug.

In addition, I added a new PartiallySerializedBlockSuite to unit test that component.

@SparkQA
Copy link

SparkQA commented Sep 10, 2016

Test build #65199 has finished for PR 15043 at commit 35a32e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lins05
Copy link
Contributor

lins05 commented Sep 10, 2016

Did a simple test and it does fix the bug. One interesting thing (without this patch) is that while records.count() returns a smaller number than the actual count, the spark UI still shows the correct records number, in my test case it's 2999808 v.s. 300000.

screen shot 2016-09-10 at 5 57 38 pm

screen shot 2016-09-10 at 5 58 26 pm

@JoshRosen
Copy link
Contributor Author

JoshRosen commented Sep 12, 2016

#15056 also touches this code and creates a new test suite for this component so I'd prefer to merge that PR first.

Edit: upon further inspection I think these could be merged independently.

@@ -782,6 +785,9 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
def valuesIterator: PartiallyUnrolledIterator[T] = {
// Close the serialization stream so that the serializer's internal buffers are freed and any
// "end-of-stream" markers can be written out so that `unrolled` is a valid serialized stream.
serializationStream.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like 'unrolled' may basically be invalid until serializationStream is called.

But it looks like valuesIterator is not the only place where unrolled is used

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65341 has finished for PR 15043 at commit 2f43e69.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65393 has finished for PR 15043 at commit ccf929f.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65390 has finished for PR 15043 at commit c4e50e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65397 has finished for PR 15043 at commit ccf929f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2016

Test build #65407 has finished for PR 15043 at commit daf447b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
consumed = true
Copy link
Contributor

@srinathshankar srinathshankar Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why set consumed = true here ? What's the problem with calling verifyNotConsumedAndNotDiscarded() twice ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was being overly-clever; it's clearer to just set consumed = true after each of the verifyNotConsumedAndNotDiscarded call sites.

serializationStream = serializationStream,
redirectableOutputStream,
unrollMemory = unrollMemory,
memoryMode = MemoryMode.ON_HEAP,
Copy link
Contributor

@srinathshankar srinathshankar Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WHat happens if the memory mode is off_heap. Is that relevant ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only affect the memory accounting for unroll memory. If you're caching a block at MEMORY_SER storage level and are using off-heap caching then it's possible for the unrolled memory to be off-heap (so the ChunkedByteBufferOutputStream will be using a DirectBuffer allocator). In this case we need to count this as off-heap unroll memory so that Spark's off-heap allocations can respect the configured off-heap memory limit.

Given that off-heap caching (and thus off-heap unrolling) is a relatively new experimental feature, it's entirely possible that there are accounting bugs within this path. I'm going to try to expand this test suite to also exercise that case just to be 100% sure that we're accounting properly.

new PartiallyUnrolledIterator(
memoryStore,
unrollMemory,
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, unrolledBuffer.dispose()),
Copy link
Contributor

@srinathshankar srinathshankar Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change from discard to dispose() ? You've made discard idempotent, right ? Does the caller have to manually release memory after the iterator is consumed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a subtle bug in the old code where the call to dispose() would end up freeing unroll memory for the buffer but that same memory would also be freed by PartiallyUnrolledIterator itself. This was exposed by the Mockito verify calls in the new test suite.

Given that unrolledBuffer.toInputStream(dispose = true) will also handle disposal of the buffer I don't think we even need this CompletionIterator here. Let me see about removing it.

Mockito.verify(partiallySerializedBlock.invokePrivate(getRedirectableOutputStream())).close()

val deserializedItems = valuesIterator.toArray.toSeq
Mockito.verify(memoryStore).releaseUnrollMemoryForThisTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the code path that makes releaseUnroll be called, give that the completion task returned by valuesIterator has changed from discard to dispose .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any non-freed unroll memory will be automatically freed at the end of the task (as part of the Executor or Task code itself). Before the task has completed, though, PartiallyUnrolledIterator will free the specified amount of unroll memory once the unrolled iterator is fully consumed.

Copy link
Contributor Author

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the good review feedback. I think that there might indeed be a pre-existing bug related to off-heap unroll accounting here, so let me try to also catch that via strengthened test cases. I'll update this patch to address your feedback.

new PartiallyUnrolledIterator(
memoryStore,
unrollMemory,
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, unrolledBuffer.dispose()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a subtle bug in the old code where the call to dispose() would end up freeing unroll memory for the buffer but that same memory would also be freed by PartiallyUnrolledIterator itself. This was exposed by the Mockito verify calls in the new test suite.

Given that unrolledBuffer.toInputStream(dispose = true) will also handle disposal of the buffer I don't think we even need this CompletionIterator here. Let me see about removing it.

serializationStream = serializationStream,
redirectableOutputStream,
unrollMemory = unrollMemory,
memoryMode = MemoryMode.ON_HEAP,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only affect the memory accounting for unroll memory. If you're caching a block at MEMORY_SER storage level and are using off-heap caching then it's possible for the unrolled memory to be off-heap (so the ChunkedByteBufferOutputStream will be using a DirectBuffer allocator). In this case we need to count this as off-heap unroll memory so that Spark's off-heap allocations can respect the configured off-heap memory limit.

Given that off-heap caching (and thus off-heap unrolling) is a relatively new experimental feature, it's entirely possible that there are accounting bugs within this path. I'm going to try to expand this test suite to also exercise that case just to be 100% sure that we're accounting properly.

Mockito.verify(partiallySerializedBlock.invokePrivate(getRedirectableOutputStream())).close()

val deserializedItems = valuesIterator.toArray.toSeq
Mockito.verify(memoryStore).releaseUnrollMemoryForThisTask(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any non-freed unroll memory will be automatically freed at the end of the task (as part of the Executor or Task code itself). Before the task has completed, though, PartiallyUnrolledIterator will free the specified amount of unroll memory once the unrolled iterator is fully consumed.

}
consumed = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was being overly-clever; it's clearer to just set consumed = true after each of the verifyNotConsumedAndNotDiscarded call sites.

@JoshRosen
Copy link
Contributor Author

JoshRosen commented Sep 15, 2016

Alright, I've updated this to address the latest round of review feedback. I did manage to spot a memory-accounting problem with off-heap memory because PartiallyUnrolledIterator had hardcoded the use of MemoryMode.OFF_HEAP.

(Whoops, didn't mean to submit this comment early; my laptop trackpad glitched out and caused a bunch of spurious mouse clicks).

@JoshRosen JoshRosen closed this Sep 15, 2016
@JoshRosen JoshRosen reopened this Sep 15, 2016
@SparkQA
Copy link

SparkQA commented Sep 16, 2016

Test build #65465 has finished for PR 15043 at commit 0d70774.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@srinathshankar srinathshankar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I think a test dimension with memory_mode = off_heap would be useful

}
}

test("cannot call valuesIterator() after finishWritingToStream()") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You can probably combine
test("cannot call valuesIterator() more than once") and
test("cannot call finishWritingToStream() after valuesIterator()") into one test
Same for making calls after finishWritingToStream

@@ -33,7 +33,7 @@ class PartiallyUnrolledIteratorSuite extends SparkFunSuite with MockitoSugar {
val rest = (unrollSize until restSize + unrollSize).iterator

val memoryStore = mock[MemoryStore]
val joinIterator = new PartiallyUnrolledIterator(memoryStore, unrollSize, unroll, rest)
val joinIterator = new PartiallyUnrolledIterator(memoryStore, ON_HEAP, unrollSize, unroll, rest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should look into trying to test this with OFF_HEAP as well.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

I agree that we should add more off-heap tests, but I'd like to do it in another patch so that we can get this one merged faster to unblock the 2.0.1 RC.

In terms of testing off-heap, I think that one of the best high-level tests / asserts would be to strengthen the releaseUnrollMemory() checks so that inappropriately releasing unroll memory during a task throws an exception during tests. Today there are some circumstances where unroll memory can only be released at the end of a task (such as an iterator backed by an unrolled block that is only partially consumed before the task ends), so the calls to release unroll memory have been tolerant of too much memory being released (it just releases min(actualMemory, requestedToRelease)). However, this is only appropriate to do at the end of the task so we should strengthen the asserts to only allow it there; this would have caught the memory mode mixup that I fixed here.

I'm going to retest this and if it passes tests then I'll merge to master and branch-2.0. I'll add the new tests described above in a followup.

@SparkQA
Copy link

SparkQA commented Sep 17, 2016

Test build #65541 has finished for PR 15043 at commit 0d70774.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

I believe that this latest test failure is caused by a known flaky PySpark test, so I'm going to merge this now and will monitor tests afterwards.

asfgit pushed a commit that referenced this pull request Sep 17, 2016
…utIteratorAsBytes()

## What changes were proposed in this pull request?

`MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read.

This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer).

The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`.

## How was this patch tested?

The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug.

In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix.

(cherry picked from commit 8faa521)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in 8faa521 Sep 17, 2016
@JoshRosen JoshRosen deleted the partially-serialized-block-values-iterator-bugfix branch September 17, 2016 18:48
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
…utIteratorAsBytes()

## What changes were proposed in this pull request?

`MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read.

This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer).

The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`.

## How was this patch tested?

The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug.

In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants