-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.2: Commit consumed offsets to the checkpoint location #4473
Spark 3.2: Commit consumed offsets to the checkpoint location #4473
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should be a test with this patch which illustrates the issue being fixed if possible
@RussellSpitzer - thanks a lot for your review. The challenge with unit testing this - is that - sparkSession is holding on to the state of the stream - internally - which makes it non-reproducible with 1 sparkSession. We saw this issue when our streams in EMR cluster had to be bought down and restarted on another EMR cluster & were able to verify this fix using the same approach. I have put the fix before even I have that unit test before the public - for visibility - so ppl. can port it as needed. This bug is essentially a time bomb in the code (which blasts off the moment a stream moves across clusters) whoever is using readStream from iceberg table & have first snapshot expired; which is almost everyone who uses this! Let me try that 2 spark sessions creation unittest and get back. putting the problem being explained above aside - the code itself has coverage in this unittest - cc: @rajarshisarkar |
@SreeramGarlapati can you please update the description with a clear statement of the bug and how this fixes it? |
I agree with @RussellSpitzer. This should have a test to verify the behavior. |
private void writeOffset(StreamingOffset offset, OutputFile file) { | ||
try (OutputStream outputStream = file.create()) { | ||
private void writeOffset(StreamingOffset offset) { | ||
OutputFile file = io.newOutputFile(initialOffsetLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @SreeramGarlapati !!!
[question] I have a doubt, can multiple streaming writing to same check-point location, result in nondeterministic state.
Let's say initial snapshot id was : snapshot(1)
Table state eventually (snapshot(1) -> snapshot(2) -> snapshot(3) -> snapshot(4))
Now stream 1 started in cluster 1 and read / commited till snapshot(2),
Now at the same time stream 2 started in cluster 2, it started from snapshot(2), before it could commit snapshot(3) , stream 1 committed snapshot(3) & snapshot(4) now when stream2 tried to commit snapshot(3) it will be overwriting the state of stream 1 or vice versa, starting state of new stream let's say stream3 in cluster 3 nondeterministic, as one stream is running ahead of other (stream 1 / stream 2).
Also earlier if two stream would have started in the same time (with no prev checkpoint / offset file) one would have failed since we did a file.create() now they can co-exist. Your thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 - multiple spark streaming clusters cannot run based on the same checkpoint location.
Closing this as Spark 3.2 is no longer supported in the latest version. If issue exist on other spark version, please handle there. |
This is Port of bug fix from our internal fork. Without this - our table maintenance expired the initial snapshot that was checkpoint'ed by the tables - which intern rendered our streaming readers to unrecoverable state.
cc: @rajarshisarkar, @daksha121, @rdblue, @aokolnychyi, @RussellSpitzer