Skip to content

Implement rate limiting while reading stream from Iceberg table as Spark3 DSv2 source #2789

Closed
@SreeramGarlapati

Description

@SreeramGarlapati

This is a continuation of work done in PR #2660: reading from iceberg table as an incremental streaming source in spark.

Here's an idea proposal by @aokolnychyi - to use limit API once we move to 3.1.

[Please refer to the Concepts section to fully understand the issue]

The Problem

Current implementation of Iceberg DSv2 micro_batch source - hands off all available data on the table to Spark3 DSv2. This could result in highly unpredictable/irregular stream sizes. This issue is an ask to explore - what are the concepts available in Iceberg - to rate limit the stream size.

Implementation proposal

Flow control can be maintained at File Level / Record Level.
Iceberg table metadata layer mainly operates at File Level - i.e., each table is a list of data (/delete) files.
While ideal implementation of flow control should be at Record level - keeping in mind the ROI, complexity of implementation & our current requirement - we only need File Level flow control.

Implementation idea is that:

  1. introduce a spark option to enable rate limiting - max-bytes-per-micro-batch - which switches to this implementation
  2. change the implementation of SparkMicroBatchStream to remember the tables stream position - the latestReturnedOffset
  3. change the implementation of latestoffset() to NOT return the iceberg tables' actual latestOffset - but, apply flow control and measure what the NEW latestReturnedOffset can be.
  4. change the OffsetStoreProvider to also add the latestCommittedOffset to be able to resume stream.

Concepts needed to grok the above idea

Background on Spark 3 - MicroBatchStream

  1. Spark expects micro batch streaming source to implement the interface MicroBatchStream
  2. Spark driver - as part of MicroBatchExecution - instantiates the Iceberg's MicroBatchStream implementation & invokes the methods in this MicroBatchStream class in the below order:
    - latestOffset() - asks Iceberg streaming source to return what is the latestOffset availabe on the stream
    - initialOffset() - asks iceberg streaming source to return what is the first ever initialOffset when this stream started
    - planInputPartitions(start, end) - spark runtime picks a start offset and end offset based on the stream position it is at - & asks iceberg streaming source to return a dataStructure InputPartition[] - which will later be distributed across spark executors
    - createReaderFactory() - iceberg micro batch streaming source should implement this method - to educate spark executor - as to - how to read a given InputPartition
    - there are other standard methods on the MicroBatchStream interface - which will be invoked by spark - like deserializeOffset, commit, stop - which are self explanatory.

Iceberg's Spark stream offset - StreamingOffset

Spark expects the streaming source - to implement Offset class. Offset - is a logical representation of position of the stream - at which spark is reading.
Iceberg's implementation (which already existed before this PR) is: StreamingOffset.
It stores 3 properties about the Iceberg table position:

  1. SnapshotId: the Iceberg table Snapshot Id
  2. position: file position in a given iceberg snapshot
  3. ScanAllFiles: whether to stream
    • All files in that Snapshot - which includes - (a) files added in older Snapshots + (b) net NEW files added in the current Snapshot
    • or to stream - only the files that are newly added in the current Snapshot.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions