Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark3 structured streaming micro_batch read support #2660

Merged
merged 40 commits into from
Jun 25, 2021

Conversation

SreeramGarlapati
Copy link
Collaborator

@SreeramGarlapati SreeramGarlapati commented Jun 2, 2021

This work is an extension of the idea in issue #179 & the Spark2 work done in PR #2272 - only that - this is for Spark3.

Implementation Notes:

  • If an Iceberg table has all Snapshots of type Append - spark3.readStream... on that Iceberg table will return all files in the First Snapshot - followed by all the data files added in the subsequent Snapshots.
  • If the reader encounters any Snapshot of type NotEqual to Append - the reader fails.
  • For checkpointing the Offset - this implementation plugs into default Spark3 offset checkpointing. Reader only maintains the InitialOffset; implementation & reasoning inspired from KafkaStreamMicroBatchStream.

What's NEXT!?

  • Change the UnitTests - which relies on Table APIs to simulate generation of Delete/Overwrite Snapshots to SQL Statements.
  • Handling snapshots of type: DELETES, REPLACE & OVERWRITES.
  • Refactoring SparkBatchScan class
    1. Separate out Batch & Scan into different implementations
    2. Draw cleaner line b/w SparkBatchScan & MicroBatchStreaming classes
  • plug into spark - stats estimation for MicroBatchStreaming
  • Implement full scan mode on first snapshot
  • Documentation update
  • Rate limiting
  • Accepting a different FiloIO implementation for checkpoints
  • Columnar reads are not enabled. Something for future.

cc: @aokolnychyi & @RussellSpitzer & @holdenk @rdblue @rdsr @jackye1995 #2611

Concepts used in this PR

Background on Spark 3 - MicroBatchStream

  1. Spark expects micro batch streaming source to implement the interface MicroBatchStream
  2. Spark driver - as part of MicroBatchExecution - instantiates the Iceberg's MicroBatchStream implementation & invokes the methods in this MicroBatchStream class in the below order:
    - latestOffset() - asks Iceberg streaming source to return what is the latestOffset availabe on the stream
    - initialOffset() - asks iceberg streaming source to return what is the first ever initialOffset when this stream started
    - planInputPartitions(start, end) - spark runtime picks a start offset and end offset based on the stream position it is at - & asks iceberg streaming source to return a dataStructure InputPartition[] - which will later be distributed across spark executors
    - createReaderFactory() - iceberg micro batch streaming source should implement this method - to educate spark executor - as to - how to read a given InputPartition
    - there are other standard methods on the MicroBatchStream interface - which will be invoked by spark - like deserializeOffset, commit, stop - which are self explanatory.

Iceberg's Spark stream offset - StreamingOffset

Spark expects the streaming source - to implement Offset class. Offset - is a logical representation of position of the stream - at which spark is reading.
Iceberg's implementation (which already existed before this PR) is: StreamingOffset.
It stores 3 properties about the Iceberg table position:

  1. SnapshotId: the Iceberg table Snapshot Id
  2. position: file position in a given iceberg snapshot
  3. ScanAllFiles: whether to stream
    • All files in that Snapshot - which includes - (a) files added in older Snapshots + (b) net NEW files added in the current Snapshot
    • or to stream - only the files that are newly added in the current Snapshot.

Iceberg's MicroBatchStream implementation

For ex., if an Iceberg table has Snapshots S1, S2, S3 - & each of them has 3 files in each snapshot - then, the MicroBatchStream implementation - would return:

  • latestOffset() -> [S3, 3, false]
  • initialOffset() -> [S1, 0, true]

if a new Snapshot S4 is added to the Iceberg table, with 5 files in it, then, the subsequent invocations on MicroBatchStream interface would return:

  • latestOffset() -> [S4, 5, false] --> pl. note that the scanAllFiles is false here; at this point we need to differentiate that this Snapshot S4 is not the snapshot S1 from initialOffset & hence reading just the NEWLY added files in this snapshot S4 is good enough. So, we need to compare latestOffset that we are returning to Spark - is not the StartingOffset!
  • initialOffset() -> [S1, 0, true]

This should hold true - even in the case when the SparkCluster running this readStream on the iceberg table dies and comes back up. But, what if the Snapshot S1 expired and when the SparkCluster - died and comes back up - we no longer have the actual initialOffset on which this readStream started?
Answer: This is the reason why - the current implementation preserves/checkpoints initialOffset!!

This work is an extension of the idea in issue apache#179 & the Spark2 work done in PR apache#2272 - only that - this is for Spark3.

**In the current implementation:**
* Iceberg Snapshot is the upper bound for MicroBatch. A given MicroBatch will only Span within a Snapshot. It will not be composed of multiple Snapshots. BatchSize - is used to limit the number of files with in a given snapshot.
* The streaming reader - will error out if it encounters any Snapshot of type NOT EQUAL to type `APPEND`. 
* Handling `DELETES`, `REPLACE` & `OVERWRITES` is something for future.
* Columnar reads are not enabled. Something for future.
@tprelle
Copy link
Contributor

tprelle commented Jun 3, 2021

As incremental read i understand why as a first implementation you do not support OVERWRITES snapshot. But as incremental read implementation, REPLACE or DELETES should just be ignored and not propagated ?

@github-actions github-actions bot added the core label Jun 4, 2021
@SreeramGarlapati
Copy link
Collaborator Author

As incremental read i understand why as a first implementation you do not support OVERWRITES snapshot. But as incremental read implementation, REPLACE or DELETES should just be ignored and not propagated ?

@tprelle - your suggestion totally makessense. I am definitely planning to implementing ignore deletes & replace. I will introduce a spark option for it in my next PR. Pl. refer to our disc here: #2611 (comment).

Overall, the principle that I am trying to follow - which also came as a suggestion from @rdblue - is that - we want to keep the PRs as small and yet useful as possible - to keep the cognitive load on the reviewers minimal.

@tprelle
Copy link
Contributor

tprelle commented Jun 4, 2021

Hi @SreeramGarlapati thanks to pointing me #2611 discussion.
I was searching a way to implements the incremental read on overwrite (currently merge statements with at least when update will do an overwrite snapshot because of the copy on write implementation).
Do you have an idea ?

use delete from statement instead of deletion using iceberg Table Api.
… - deletion using iceberg Table Api. CI is unhappy with delete from sql statement - which needs some debugging.
// this should create a snapshot with type Replace.
table.rewriteManifests()
.clusterBy(f -> 1)
.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should always be safe to simply skip replace commits because they should never change the data in the table, just the data files or manifests. It's fine to fail for now, but I think you'd want to support skipping these pretty quickly so you can support compaction in the background.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is on the top of my What's Next list. A spark option to ignore-replace.

@rdblue
Copy link
Contributor

rdblue commented Jun 24, 2021

@SreeramGarlapati, this looks great. My only concern is that startSpark/stopSpark duplicates the code in SparkTestBase. I think we should configure the Spark session just once.

@rdblue rdblue merged commit 9cfcf5c into apache:master Jun 25, 2021
@rdblue
Copy link
Contributor

rdblue commented Jun 25, 2021

Merged! Thanks @SreeramGarlapati for pushing this through. And thanks to @RussellSpitzer, @holdenk, @jackye1995, and @aokolnychyi for reviewing!

@rdblue
Copy link
Contributor

rdblue commented Jun 25, 2021

@daksha121, sorry that I missed that you had a commit in here, too. I should have been more careful with the commit message to make sure you were listed as a co-author. Thanks for contributing here!

@davseitsev
Copy link

@SreeramGarlapati thank you very much for your work. I have a few questions.

We have Spark streaming job which reads data from Kafka, process it and store to iceberg table partitioned by day.
There is a background compaction process and scheduled cleanup task that expires old snapshots to remove old small files.
I want to build another streaming job that reads a few tables produced by first job, unions them, filters necessary rows and stores data to iceberg table.
Thus I'd like to understand better how expired snapshots are handled.

In our case source table contains append and replace snapshots.
MicroBatchStream.initialOffset() always returns StreamingOffset with scanAllFiles=true to process historical data. As old snapshots are expired by cleanup process we can get into the case when first snapshot is of type replace. Due to #2752 we ignore replace snapshots. Will it lead to the situation when we skip initial snapshot with scanAllFiles=true and loose all data appended in old (expired) snapshots?

And one more question.
Let's say have data in source table for 1 year, expire snapshots older than 7 days and cleanup job runs every 1 hour.
If a job starts reading this table from initial offset it has at most 1 hour to process first snapshot, doesn't it? As initial snapshot is processed with scanAllFiles=true, it's the biggest one because it contains data for 1 year minus 7 days. If I'm correct, there is a big chance that streaming job will fail in the middle when cleanup job runs. Because it will expire the snapshot which is processed by streaming job.
Would it work if initialOffset() returns latest snapshot with flag scanAllFiles instead of first snapshot?:

  • latestOffset() -> [LatestSnapshot, LatestFile, false]
  • initialOffset() -> [LatestSnapshot, 0, true]
    Probably in this case we should have 7 days to process initial snapshot.

Are there any other corner cases with snapshots expiration?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants