Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3: support rate limit in Spark Streaming #4479

Merged
merged 8 commits into from
Apr 22, 2023

Conversation

singhpk234
Copy link
Contributor

@singhpk234 singhpk234 commented Apr 3, 2022

About the change

Supports rate limiting in SS via Admission Control API. Rate limiting is a required to have an uniform micro-batch sizes, At present spark Admission Control API exposes rate limiting on 2 things (File Read / Records Read).

Now we can have offset that read snapshot's partially (based on number of Files / Records limits specified) as well as now a Microbatch can span across mutiple snapshots where each snapshot could be partially read. Since we took into consideration already in past that a microbatch can read from [startFileIndex, endFileIndex) no changes to microbatch interface was made.

Please ref below for more about Admission Control APIs / Read Limit :


Concepts used in this PR

(copied from : #2660)

Background on Spark 3 - MicroBatchStream

Spark expects micro batch streaming source to implement the interface MicroBatchStream

Spark driver - as part of MicroBatchExecution - instantiates the Iceberg's MicroBatchStream implementation & invokes the methods in this MicroBatchStream class in the below order:

  • latestOffset(Offset startOffset, ReadLimit readLimit) - asks Iceberg streaming source to return what is the latestOffset availabe on the stream, given that previous offset was startOffset.
  • initialOffset() - asks iceberg streaming source to return what is the first ever initialOffset when this stream started
  • planInputPartitions(start, end) - spark runtime picks a start offset and end offset based on the stream position it is at - & asks iceberg streaming source to return a dataStructure InputPartition[] - which will later be distributed across spark executors
  • createReaderFactory() - iceberg micro batch streaming source should implement this method - to educate spark executor - as to - how to read a given InputPartition
  • there are other standard methods on the MicroBatchStream interface - which will be invoked by spark - like deserializeOffset, commit, stop - which are self explanatory.

Iceberg's Spark stream offset - StreamingOffset

Spark expects the streaming source - to implement Offset class. Offset - is a logical representation of position of the stream

  • at which spark is reading.
    Iceberg's implementation (which already existed before this PR) is: StreamingOffset.

It stores 3 properties about the Iceberg table position:

  • SnapshotId: the Iceberg table Snapshot Id
  • position: file position in a given iceberg snapshot
  • ScanAllFiles: whether to stream
    All files in that Snapshot - which includes - (a) files added in older Snapshots + (b) net NEW files added in the current

Snapshot
or to stream - only the files that are newly added in the current Snapshot.


Testing :
Added UT's to assert expected number of microbatches are made.

Closes : #2789

@singhpk234 singhpk234 closed this Apr 3, 2022
@singhpk234 singhpk234 reopened this Apr 4, 2022
@singhpk234 singhpk234 changed the title [WIP] Spark : support rate limit in SS [WIP] Spark 3.2: support rate limit in Spark Streaming Apr 4, 2022
@singhpk234 singhpk234 closed this Apr 4, 2022
@singhpk234 singhpk234 reopened this Apr 4, 2022
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch 2 times, most recently from 71ac6bb to 59be794 Compare April 5, 2022 12:11
@singhpk234
Copy link
Contributor Author

singhpk234 commented Apr 5, 2022

Hi all members,

I am attempting this feature of supporting rate-limiting when stream read iceberg table. I have put out an initial PR for the same, would appreciate your thoughts on same :) !!!

cc @rdblue, @aokolnychyi, @RussellSpitzer, @kbendick, @jackye1995, @rajarshisarkar

@singhpk234 singhpk234 changed the title [WIP] Spark 3.2: support rate limit in Spark Streaming Spark 3.2: support rate limit in Spark Streaming Apr 5, 2022
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from 59be794 to 471b53d Compare April 12, 2022 14:16
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch 2 times, most recently from c631ac9 to ed43f62 Compare April 25, 2022 15:32
@singhpk234 singhpk234 marked this pull request as ready for review April 25, 2022 15:44
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from ed43f62 to ee1f9ec Compare June 3, 2022 08:28
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from ee1f9ec to c0c60f6 Compare June 17, 2022 16:01
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from c0c60f6 to 91f8eb8 Compare July 18, 2022 05:39
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from 85a9228 to 240ec18 Compare August 2, 2022 13:31
@singhpk234
Copy link
Contributor Author

once read limit is respected via this PR:

will add support of Trigger.AvailableNow() introduced in spark 3.3 : https://issues.apache.org/jira/browse/SPARK-36533

@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch 2 times, most recently from 36b51b6 to a45847b Compare November 10, 2022 19:01
@singhpk234
Copy link
Contributor Author

cc @rdblue

@cccs-jc
Copy link
Contributor

cccs-jc commented Dec 2, 2022

This is really cool. Currently, without being able limit the input rows, it's not possible to be down for too long..

@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch 2 times, most recently from 149f406 to ecd8de8 Compare January 8, 2023 06:07
.palantir/revapi.yml Outdated Show resolved Hide resolved
@@ -200,25 +210,43 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

StreamingOffset currentOffset = null;

// [(startOffset : startFileIndex), (endOffset : endFileIndex) )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last ) should be ]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added ) in place of ] to show the range endOffset is not inclusive.

@cccs-jc
Copy link
Contributor

cccs-jc commented Mar 11, 2023

@singhpk234 it's looking good. Just a few more small changes are suggested. Do you think you'll have time to address them next week. Cheers

@singhpk234
Copy link
Contributor Author

@singhpk234 it's looking good. Just a few more small changes are suggested. Do you think you'll have time to address them next week. Cheers

Definitely @cccs-jc ! Will address these feedback by end of this week as well as raise this against 3.3. Apologies have been keeping busy due to some internal deliverables

@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from 2739f50 to f23cefd Compare March 20, 2023 00:05
@singhpk234 singhpk234 changed the title Spark 3.2: support rate limit in Spark Streaming Spark 3.3: support rate limit in Spark Streaming Mar 20, 2023
@singhpk234 singhpk234 force-pushed the feature/support-rate-limiting-ss branch from 195cad6 to a29f84c Compare March 23, 2023 20:39
Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! Waiting for any other comment from @RussellSpitzer

@cccs-jc
Copy link
Contributor

cccs-jc commented Apr 11, 2023

Looking forward to this feature @singhpk234 :-)

@jackye1995
Copy link
Contributor

@RussellSpitzer do you have any further comment? If not I think this is good to go

@cccs-jc
Copy link
Contributor

cccs-jc commented Apr 22, 2023

@RussellSpitzer good to go?

@jackye1995
Copy link
Contributor

Looks like this has been inactive and I think it's ready to go.

I know we just added Spark 3.4, but given the fact that this PR has been hanging for a long time, I think it's probably better to get this in first. I will go ahead to merge it. @singhpk234 or @cccs-jc could you also add this patch to 3.4 afterwards?

@jackye1995 jackye1995 merged commit 029622b into apache:master Apr 22, 2023
manisin pushed a commit to Snowflake-Labs/iceberg that referenced this pull request May 9, 2023
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (curPos >= startPosOfSnapOffset) {
// TODO : use readLimit provided in function param, the readLimits are derived from
Copy link
Contributor

@namrathamyske namrathamyske Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 any reason we don't use this from function parameter and marked this as TODO? I see that limit is not used in the whole function.


// only 1 micro-batch will be formed and we will read data partially
Assert.assertEquals(
1,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 👋 Can you help me understand why the expected result here 1 and not 7? For example, the test "WithNumberOfRows_4" expects all of the test data to have been processed in 2 micro batches

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, I see! The next file has 2 records, and the rate limiting isn't sophisticated enough to break a file into multiple microbatches (which makes sense).

But I am curious why the first list of records gets broken into two files deterministically. The first file has simple-record-1, and the second file has simple-record-2 and simple-record-3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement rate limiting while reading stream from Iceberg table as Spark3 DSv2 source
7 participants