Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.2: Commit consumed offsets to the checkpoint location #4473

Conversation

SreeramGarlapati
Copy link
Collaborator

@SreeramGarlapati SreeramGarlapati commented Apr 2, 2022

This is Port of bug fix from our internal fork. Without this - our table maintenance expired the initial snapshot that was checkpoint'ed by the tables - which intern rendered our streaming readers to unrecoverable state.

cc: @rajarshisarkar, @daksha121, @rdblue, @aokolnychyi, @RussellSpitzer

@github-actions github-actions bot added the spark label Apr 2, 2022
@SreeramGarlapati SreeramGarlapati changed the title Spark3: implement micro-batch streaming read - commitOffset Spark3.2: implement micro-batch streaming read - commitOffset Apr 2, 2022
Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be a test with this patch which illustrates the issue being fixed if possible

@SreeramGarlapati
Copy link
Collaborator Author

SreeramGarlapati commented Apr 3, 2022

@RussellSpitzer - thanks a lot for your review.

The challenge with unit testing this - is that - sparkSession is holding on to the state of the stream - internally - which makes it non-reproducible with 1 sparkSession. We saw this issue when our streams in EMR cluster had to be bought down and restarted on another EMR cluster & were able to verify this fix using the same approach.
So, all-in-all - I might be needing a unit test which spans across 2 sparkSessions & try this; the way tests are written is needing a bit of refactor to make it use 2 spark sessions & even after that I am unsure - if this will reproduce this (given the process sharing). Is there any precedence in the codebase for this pattern that you could kindly point me to.

I have put the fix before even I have that unit test before the public - for visibility - so ppl. can port it as needed. This bug is essentially a time bomb in the code (which blasts off the moment a stream moves across clusters) whoever is using readStream from iceberg table & have first snapshot expired; which is almost everyone who uses this!

Let me try that 2 spark sessions creation unittest and get back.

putting the problem being explained above aside - the code itself has coverage in this unittest - testResumingStreamReadFromCheckpoint - using which I was able to debug thru & fix the code - where I need to use file.createOrOverwrite() instead of the existing file.create().

cc: @rajarshisarkar

@rdblue
Copy link
Contributor

rdblue commented Apr 3, 2022

@SreeramGarlapati can you please update the description with a clear statement of the bug and how this fixes it?

@rdblue rdblue changed the title Spark3.2: implement micro-batch streaming read - commitOffset Spark 3.2: Commit consumed offsets to the checkpoint location Apr 3, 2022
@rdblue
Copy link
Contributor

rdblue commented Apr 3, 2022

I agree with @RussellSpitzer. This should have a test to verify the behavior.

private void writeOffset(StreamingOffset offset, OutputFile file) {
try (OutputStream outputStream = file.create()) {
private void writeOffset(StreamingOffset offset) {
OutputFile file = io.newOutputFile(initialOffsetLocation);
Copy link
Contributor

@singhpk234 singhpk234 Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @SreeramGarlapati !!!

[question] I have a doubt, can multiple streaming writing to same check-point location, result in nondeterministic state.

Let's say initial snapshot id was : snapshot(1)
Table state eventually (snapshot(1) -> snapshot(2) -> snapshot(3) -> snapshot(4))
Now stream 1 started in cluster 1 and read / commited till snapshot(2),
Now at the same time stream 2 started in cluster 2, it started from snapshot(2), before it could commit snapshot(3) , stream 1 committed snapshot(3) & snapshot(4) now when stream2 tried to commit snapshot(3) it will be overwriting the state of stream 1 or vice versa, starting state of new stream let's say stream3 in cluster 3 nondeterministic, as one stream is running ahead of other (stream 1 / stream 2).

Also earlier if two stream would have started in the same time (with no prev checkpoint / offset file) one would have failed since we did a file.create() now they can co-exist. Your thoughts ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 - multiple spark streaming clusters cannot run based on the same checkpoint location.

@ajantha-bhat
Copy link
Member

Closing this as Spark 3.2 is no longer supported in the latest version.

If issue exist on other spark version, please handle there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants