Description
This is a continuation of work done in PR #2660: reading from iceberg table as an incremental streaming source in spark.
Here's an idea proposal by @aokolnychyi - to use limit API once we move to 3.1.
[Please refer to the Concepts section to fully understand the issue]
The Problem
Current implementation of Iceberg DSv2 micro_batch source - hands off all available data on the table to Spark3 DSv2. This could result in highly unpredictable/irregular stream sizes. This issue is an ask to explore - what are the concepts available in Iceberg - to rate limit the stream size.
Implementation proposal
Flow control can be maintained at File Level
/ Record Level
.
Iceberg table metadata layer mainly operates at File Level
- i.e., each table is a list of data (/delete) files.
While ideal implementation of flow control should be at Record level - keeping in mind the ROI, complexity of implementation & our current requirement - we only need File Level
flow control.
Implementation idea is that:
- introduce a spark option to enable rate limiting -
max-bytes-per-micro-batch
- which switches to this implementation - change the implementation of
SparkMicroBatchStream
to remember the tables stream position - thelatestReturnedOffset
- change the implementation of
latestoffset()
to NOT return the iceberg tables' actuallatestOffset
- but, apply flow control and measure what the NEWlatestReturnedOffset
can be. - change the OffsetStoreProvider to also add the
latestCommittedOffset
to be able to resume stream.
Concepts needed to grok the above idea
Background on Spark 3 - MicroBatchStream
- Spark expects micro batch streaming source to implement the interface
MicroBatchStream
- Spark driver - as part of
MicroBatchExecution
- instantiates the Iceberg'sMicroBatchStream
implementation & invokes the methods in thisMicroBatchStream
class in the below order:
-latestOffset()
- asks Iceberg streaming source to return what is thelatestOffset
availabe on the stream
-initialOffset()
- asks iceberg streaming source to return what is the first everinitialOffset
when this stream started
-planInputPartitions(start, end)
- spark runtime picks astart
offset andend
offset based on the stream position it is at - & asks iceberg streaming source to return a dataStructureInputPartition[]
- which will later be distributed across spark executors
-createReaderFactory()
- iceberg micro batch streaming source should implement this method - to educate spark executor - as to - how to read a givenInputPartition
- there are other standard methods on theMicroBatchStream
interface - which will be invoked by spark - likedeserializeOffset
,commit
,stop
- which are self explanatory.
Iceberg's Spark stream offset - StreamingOffset
Spark expects the streaming source - to implement Offset
class. Offset
- is a logical representation of position of the stream - at which spark is reading.
Iceberg's implementation (which already existed before this PR) is: StreamingOffset
.
It stores 3 properties about the Iceberg table position:
- SnapshotId: the Iceberg table Snapshot Id
- position: file position in a given iceberg snapshot
- ScanAllFiles: whether to stream
- All files in that Snapshot - which includes - (a) files added in older Snapshots + (b) net NEW files added in the current Snapshot
- or to stream - only the files that are newly added in the current Snapshot.
Activity