-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3: support rate limit in Spark Streaming #4479
Spark 3.3: support rate limit in Spark Streaming #4479
Conversation
71ac6bb
to
59be794
Compare
Hi all members, I am attempting this feature of supporting rate-limiting when stream read iceberg table. I have put out an initial PR for the same, would appreciate your thoughts on same :) !!! cc @rdblue, @aokolnychyi, @RussellSpitzer, @kbendick, @jackye1995, @rajarshisarkar |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
59be794
to
471b53d
Compare
c631ac9
to
ed43f62
Compare
ed43f62
to
ee1f9ec
Compare
ee1f9ec
to
c0c60f6
Compare
c0c60f6
to
91f8eb8
Compare
85a9228
to
240ec18
Compare
once read limit is respected via this PR: will add support of |
36b51b6
to
a45847b
Compare
cc @rdblue |
This is really cool. Currently, without being able limit the input rows, it's not possible to be down for too long.. |
149f406
to
ecd8de8
Compare
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
Outdated
Show resolved
Hide resolved
@@ -200,25 +210,43 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse | |||
|
|||
StreamingOffset currentOffset = null; | |||
|
|||
// [(startOffset : startFileIndex), (endOffset : endFileIndex) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the last ) should be ]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added )
in place of ]
to show the range endOffset is not inclusive.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
@singhpk234 it's looking good. Just a few more small changes are suggested. Do you think you'll have time to address them next week. Cheers |
Definitely @cccs-jc ! Will address these feedback by end of this week as well as raise this against 3.3. Apologies have been keeping busy due to some internal deliverables |
2739f50
to
f23cefd
Compare
195cad6
to
a29f84c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! Waiting for any other comment from @RussellSpitzer
Looking forward to this feature @singhpk234 :-) |
@RussellSpitzer do you have any further comment? If not I think this is good to go |
@RussellSpitzer good to go? |
Looks like this has been inactive and I think it's ready to go. I know we just added Spark 3.4, but given the fact that this PR has been hanging for a long time, I think it's probably better to get this in first. I will go ahead to merge it. @singhpk234 or @cccs-jc could you also add this patch to 3.4 afterwards? |
while (taskIter.hasNext()) { | ||
FileScanTask task = taskIter.next(); | ||
if (curPos >= startPosOfSnapOffset) { | ||
// TODO : use readLimit provided in function param, the readLimits are derived from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 any reason we don't use this from function parameter and marked this as TODO? I see that limit
is not used in the whole function.
|
||
// only 1 micro-batch will be formed and we will read data partially | ||
Assert.assertEquals( | ||
1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 👋 Can you help me understand why the expected result here 1 and not 7? For example, the test "WithNumberOfRows_4" expects all of the test data to have been processed in 2 micro batches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh, I see! The next file has 2 records, and the rate limiting isn't sophisticated enough to break a file into multiple microbatches (which makes sense).
But I am curious why the first list of records gets broken into two files deterministically. The first file has simple-record-1, and the second file has simple-record-2 and simple-record-3.
About the change
Supports rate limiting in SS via Admission Control API. Rate limiting is a required to have an uniform micro-batch sizes, At present spark Admission Control API exposes rate limiting on 2 things (File Read / Records Read).
Now we can have offset that read snapshot's partially (based on number of Files / Records limits specified) as well as now a Microbatch can span across mutiple snapshots where each snapshot could be partially read. Since we took into consideration already in past that a microbatch can read from [startFileIndex, endFileIndex) no changes to microbatch interface was made.
Please ref below for more about Admission Control APIs / Read Limit :
Concepts used in this PR
(copied from : #2660)
Background on Spark 3 - MicroBatchStream
Spark expects micro batch streaming source to implement the interface MicroBatchStream
Spark driver - as part of MicroBatchExecution - instantiates the Iceberg's MicroBatchStream implementation & invokes the methods in this MicroBatchStream class in the below order:
Iceberg's Spark stream offset - StreamingOffset
Spark expects the streaming source - to implement Offset class. Offset - is a logical representation of position of the stream
Iceberg's implementation (which already existed before this PR) is: StreamingOffset.
It stores 3 properties about the Iceberg table position:
All files in that Snapshot - which includes - (a) files added in older Snapshots + (b) net NEW files added in the current
Snapshot
or to stream - only the files that are newly added in the current Snapshot.
Testing :
Added UT's to assert expected number of microbatches are made.
Closes : #2789