-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark3 structured streaming micro_batch read support #2660
Conversation
This work is an extension of the idea in issue apache#179 & the Spark2 work done in PR apache#2272 - only that - this is for Spark3. **In the current implementation:** * Iceberg Snapshot is the upper bound for MicroBatch. A given MicroBatch will only Span within a Snapshot. It will not be composed of multiple Snapshots. BatchSize - is used to limit the number of files with in a given snapshot. * The streaming reader - will error out if it encounters any Snapshot of type NOT EQUAL to type `APPEND`. * Handling `DELETES`, `REPLACE` & `OVERWRITES` is something for future. * Columnar reads are not enabled. Something for future.
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
As incremental read i understand why as a first implementation you do not support OVERWRITES snapshot. But as incremental read implementation, REPLACE or DELETES should just be ignored and not propagated ? |
@tprelle - your suggestion totally makessense. I am definitely planning to implementing Overall, the principle that I am trying to follow - which also came as a suggestion from @rdblue - is that - we want to keep the PRs as small and yet useful as possible - to keep the cognitive load on the reviewers minimal. |
Hi @SreeramGarlapati thanks to pointing me #2611 discussion. |
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
…ad of potentially iterating thru snapshot.addedFiles list
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Show resolved
Hide resolved
refactor stopStreams into an @after method
use delete from statement instead of deletion using iceberg Table Api.
… - deletion using iceberg Table Api. CI is unhappy with delete from sql statement - which needs some debugging.
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
// this should create a snapshot with type Replace. | ||
table.rewriteManifests() | ||
.clusterBy(f -> 1) | ||
.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should always be safe to simply skip replace
commits because they should never change the data in the table, just the data files or manifests. It's fine to fail for now, but I think you'd want to support skipping these pretty quickly so you can support compaction in the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is on the top of my What's Next list. A spark option to ignore-replace
.
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
@SreeramGarlapati, this looks great. My only concern is that |
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
Merged! Thanks @SreeramGarlapati for pushing this through. And thanks to @RussellSpitzer, @holdenk, @jackye1995, and @aokolnychyi for reviewing! |
@daksha121, sorry that I missed that you had a commit in here, too. I should have been more careful with the commit message to make sure you were listed as a co-author. Thanks for contributing here! |
…)" This reverts commit 9cfcf5c.
…pache#2660)"" This reverts commit 1eade43.
@SreeramGarlapati thank you very much for your work. I have a few questions. We have Spark streaming job which reads data from Kafka, process it and store to iceberg table partitioned by day. In our case source table contains And one more question.
Are there any other corner cases with snapshots expiration? |
This work is an extension of the idea in issue #179 & the Spark2 work done in PR #2272 - only that - this is for Spark3.
Implementation Notes:
Append
-spark3.readStream...
on that Iceberg table will return all files in the First Snapshot - followed by all the data files added in the subsequent Snapshots.NotEqual
toAppend
- the reader fails.What's NEXT!?
DELETES
,REPLACE
&OVERWRITES
.SparkBatchScan
classBatch
&Scan
into different implementationsSparkBatchScan
&MicroBatchStreaming
classesMicroBatchStreaming
cc: @aokolnychyi & @RussellSpitzer & @holdenk @rdblue @rdsr @jackye1995 #2611
Concepts used in this PR
Background on Spark 3 - MicroBatchStream
MicroBatchStream
MicroBatchExecution
- instantiates the Iceberg'sMicroBatchStream
implementation & invokes the methods in thisMicroBatchStream
class in the below order:-
latestOffset()
- asks Iceberg streaming source to return what is thelatestOffset
availabe on the stream-
initialOffset()
- asks iceberg streaming source to return what is the first everinitialOffset
when this stream started-
planInputPartitions(start, end)
- spark runtime picks astart
offset andend
offset based on the stream position it is at - & asks iceberg streaming source to return a dataStructureInputPartition[]
- which will later be distributed across spark executors-
createReaderFactory()
- iceberg micro batch streaming source should implement this method - to educate spark executor - as to - how to read a givenInputPartition
- there are other standard methods on the
MicroBatchStream
interface - which will be invoked by spark - likedeserializeOffset
,commit
,stop
- which are self explanatory.Iceberg's Spark stream offset - StreamingOffset
Spark expects the streaming source - to implement
Offset
class.Offset
- is a logical representation of position of the stream - at which spark is reading.Iceberg's implementation (which already existed before this PR) is:
StreamingOffset
.It stores 3 properties about the Iceberg table position:
Iceberg's MicroBatchStream implementation
For ex., if an Iceberg table has Snapshots S1, S2, S3 - & each of them has 3 files in each snapshot - then, the
MicroBatchStream
implementation - would return:latestOffset()
-> [S3, 3, false]initialOffset()
-> [S1, 0, true]if a new Snapshot S4 is added to the Iceberg table, with 5 files in it, then, the subsequent invocations on
MicroBatchStream
interface would return:latestOffset()
-> [S4, 5, false] --> pl. note that thescanAllFiles
is false here; at this point we need to differentiate that this Snapshot S4 is not the snapshot S1 frominitialOffset
& hence reading just the NEWLY added files in this snapshot S4 is good enough. So, we need to compare latestOffset that we are returning to Spark - is not the StartingOffset!initialOffset()
-> [S1, 0, true]This should hold true - even in the case when the SparkCluster running this
readStream
on the iceberg table dies and comes back up. But, what if the Snapshot S1 expired and when the SparkCluster - died and comes back up - we no longer have the actualinitialOffset
on which thisreadStream
started?Answer: This is the reason why - the current implementation preserves/checkpoints
initialOffset
!!