Add capabilities to ingest from another stream without disabling the realtime table#9289
Conversation
Codecov Report
@@ Coverage Diff @@
## master #9289 +/- ##
=============================================
+ Coverage 15.28% 68.57% +53.29%
- Complexity 168 5007 +4839
=============================================
Files 1814 1867 +53
Lines 97379 99659 +2280
Branches 14893 15158 +265
=============================================
+ Hits 14880 68345 +53465
+ Misses 81367 26396 -54971
- Partials 1132 4918 +3786
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Instead of offsetCriteria, can we call it consumeFrom? We can translate it to whatever we want internally.
And the possible values can be forceEarliest, forceLatest, or best (yes, we need a third value).
forceEarliest and forceLatest will ignore any previous completed segment offsets and just do as forced. Pick the earliest or latest offsets and resume consumption.
The best option will attempt to minimize data loss, pick up the first available event after the last consumed event.
There was a problem hiding this comment.
👍 on consumeFrom. It reads better that way, but I prefer to keep smallest and largest so the uri looks like:
/tables/<tableName>/resumeConsumption?consumeFrom=smallest
For the behavior of the best option you suggested, we currently achieve that if consumeFrom parameter is not specified.
There was a problem hiding this comment.
Let us add all three options, and say that the default is best. Also, I prefer earliest and latest as being more intuitive. Alternative: oldestEvents mostRecentEvents, fromLastPause
7d2cb81 to
757a3c3
Compare
There was a problem hiding this comment.
why not just keep this as smallest/largest, since user is already familiar with those for table consumption?
There was a problem hiding this comment.
Sajjad originally had offsetCriteria=smallest|largest. I was the one asking him to change, and we are now at consumeFrom=earliest|latest|best.
Since two of you prefer smallest/largest, I can go with that. I also felt that we should have a third (optional) value. If we settle on best, then most people may choose that thinking it is some automatic way. In reality, what we mean is that we will pick up from where we left off.
If you both feel that leaving out the third option is ideal, I can go with that as well.
Let us wrap this up today, thanks.
There was a problem hiding this comment.
This is how I see pause/resume feature. Operator pauses consumption for some reason. After a while, they want to resume the consumption. The default behavior should be to pick up the events from the offset where we left off. If the offset is gone, we should automatically start with the smallest available offset. So that's the default behavior, but if operator wants to change the offset for some reason like stream connection change, then consumption should resume based on the provided "resumeFrom" parameter. As Neha mentioned, since users/operators are familiar with smallest/largest offset criteria, IMO it's better to use the same values.
There was a problem hiding this comment.
+1 on keeping smallest/largest. Imo no need of "best" as that's the default anyway.
There was a problem hiding this comment.
should we have the timestamp and period offset criteria also, now that we support those too?
There was a problem hiding this comment.
These may not apply to the use cases that we are considering, but for completeness, we can have it (or, we can choose to add later)
There was a problem hiding this comment.
The change is very simple, but I think we can add it later when there's a request for it.
mcvsubbu
left a comment
There was a problem hiding this comment.
The approach looks fine tome, the code is a bit confusing to read.
Overall, I am fine with this
There was a problem hiding this comment.
Can u add a comment before this line what this map is supposed to contain?
The logic in this class is getting quite hard to read, can we even base class some methods and sub-class the partitionGroup vs partitionId for the two different type of streams we suppoer?
There was a problem hiding this comment.
I'll add the comments.
For the refactoring, I agree with you that this class is hard to read. It already has more than 1500 lines and it's worth refactoring, but the refactoring is outside the scope of this PR.
There was a problem hiding this comment.
It may make things more readable if we can get the smallest offset all the time? Does it involve multiple calls to the stream, and is that what we are optimizing here? If so, good to add a comment. Otherwise, getting it once unconditionally make make things a bit more readable.
There was a problem hiding this comment.
Yes, this is for optimization. Some topics have hundreds of partitions and we shouldn't call the stream to get the same metadata hundreds of times. I'll add the comments.
There was a problem hiding this comment.
Can you consider removing the offsetCriteria from the argument here, and incorporating the logic to deal with a non-null value of offsetCriteria outside this method? Not sure if it will make the logic more readable, but worth a try, I think
There was a problem hiding this comment.
selectStartOffset method is used in two places. If we move the non-null check out of this method, we need to repeat the non-null check two times. That's why it was added to the method.
Also, this method select the start offset. If the offset criteria is provided, it gets the start offset from one map and if not, it gets it from the other map. So that's another reason I think having the non-null check belongs to this method.
c78ef83 to
c43ae70
Compare
mcvsubbu
left a comment
There was a problem hiding this comment.
Sajjad and I met offline and decided that it is best we stick to the well-known terms here.
I have no further comments, lgtm
Description
With this PR, now an operator can change the underlying stream without disabling the table. The operator needs to do the followings:
When a stream is changed, partitions will have new offsets. Before this PR, resume request could only resume the stream from the offsets at which the stream was paused. Since the offsets from previous stream can't be used for the new stream, the resume request should specify offset criteria for the starting point of consumption from the new stream.
Testing Done
LLCRealtimeClusterIntegrationTestlocally and created two topics (the consumption started with the first topic)