Skip to content

Add capabilities to ingest from another stream without disabling the realtime table#9289

Merged
sajjad-moradi merged 5 commits intoapache:masterfrom
sajjad-moradi:feature/resume.with.offset.criteria
Aug 30, 2022
Merged

Add capabilities to ingest from another stream without disabling the realtime table#9289
sajjad-moradi merged 5 commits intoapache:masterfrom
sajjad-moradi:feature/resume.with.offset.criteria

Conversation

@sajjad-moradi
Copy link
Contributor

Description

With this PR, now an operator can change the underlying stream without disabling the table. The operator needs to do the followings:

  1. Issue a pause request to the table,
  2. Change the stream by modifying table's stream configs like topic name, cluster name, etc.
  3. Issue a resume request with the desired offset criteria

When a stream is changed, partitions will have new offsets. Before this PR, resume request could only resume the stream from the offsets at which the stream was paused. Since the offsets from previous stream can't be used for the new stream, the resume request should specify offset criteria for the starting point of consumption from the new stream.

Testing Done

  • Modified LLCRealtimeClusterIntegrationTest locally and created two topics (the consumption started with the first topic)
  • Issued a pause request
  • Modified the table config to point to the second topic
  • Issued a resume request with 'smallest' as offset criteria
  • Verified that the records in the table include data from both topics
  • Verified the desired values in segment ZK metadata
  • Repeated the above steps with 'largest' offset criteria
  • Repeated the above steps with no offset criteria (for this case topic wasn't changed)

@codecov-commenter
Copy link

codecov-commenter commented Aug 27, 2022

Codecov Report

Merging #9289 (c43ae70) into master (5ecca80) will increase coverage by 53.29%.
The diff coverage is 42.85%.

@@              Coverage Diff              @@
##             master    #9289       +/-   ##
=============================================
+ Coverage     15.28%   68.57%   +53.29%     
- Complexity      168     5007     +4839     
=============================================
  Files          1814     1867       +53     
  Lines         97379    99659     +2280     
  Branches      14893    15158      +265     
=============================================
+ Hits          14880    68345    +53465     
+ Misses        81367    26396    -54971     
- Partials       1132     4918     +3786     
Flag Coverage Δ
integration1 26.24% <42.85%> (?)
unittests1 67.08% <ø> (?)
unittests2 15.28% <38.77%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ller/api/resources/PinotRealtimeTableResource.java 0.00% <0.00%> (ø)
...r/validation/RealtimeSegmentValidationManager.java 74.32% <33.33%> (+30.66%) ⬆️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 70.56% <47.61%> (+9.87%) ⬆️
...pinot/plugin/metrics/yammer/YammerJmxReporter.java 100.00% <0.00%> (ø)
...ugin/inputformat/csv/CSVRecordExtractorConfig.java 0.00% <0.00%> (ø)
.../pinot/server/api/resources/PinotServerLogger.java 0.00% <0.00%> (ø)
.../starter/helix/HelixInstanceDataManagerConfig.java 80.43% <0.00%> (ø)
...che/pinot/plugin/metrics/yammer/YammerMetered.java 25.00% <0.00%> (ø)
...plugin/segmentuploader/SegmentUploaderDefault.java 87.09% <0.00%> (ø)
.../helix/FreshnessBasedConsumptionStatusChecker.java 0.00% <0.00%> (ø)
... and 1383 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of offsetCriteria, can we call it consumeFrom? We can translate it to whatever we want internally.

And the possible values can be forceEarliest, forceLatest, or best (yes, we need a third value).

forceEarliest and forceLatest will ignore any previous completed segment offsets and just do as forced. Pick the earliest or latest offsets and resume consumption.

The best option will attempt to minimize data loss, pick up the first available event after the last consumed event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on consumeFrom. It reads better that way, but I prefer to keep smallest and largest so the uri looks like:
/tables/<tableName>/resumeConsumption?consumeFrom=smallest

For the behavior of the best option you suggested, we currently achieve that if consumeFrom parameter is not specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us add all three options, and say that the default is best. Also, I prefer earliest and latest as being more intuitive. Alternative: oldestEvents mostRecentEvents, fromLastPause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@sajjad-moradi sajjad-moradi force-pushed the feature/resume.with.offset.criteria branch 2 times, most recently from 7d2cb81 to 757a3c3 Compare August 29, 2022 00:54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just keep this as smallest/largest, since user is already familiar with those for table consumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sajjad originally had offsetCriteria=smallest|largest. I was the one asking him to change, and we are now at consumeFrom=earliest|latest|best.

Since two of you prefer smallest/largest, I can go with that. I also felt that we should have a third (optional) value. If we settle on best, then most people may choose that thinking it is some automatic way. In reality, what we mean is that we will pick up from where we left off.

If you both feel that leaving out the third option is ideal, I can go with that as well.

Let us wrap this up today, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I see pause/resume feature. Operator pauses consumption for some reason. After a while, they want to resume the consumption. The default behavior should be to pick up the events from the offset where we left off. If the offset is gone, we should automatically start with the smallest available offset. So that's the default behavior, but if operator wants to change the offset for some reason like stream connection change, then consumption should resume based on the provided "resumeFrom" parameter. As Neha mentioned, since users/operators are familiar with smallest/largest offset criteria, IMO it's better to use the same values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on keeping smallest/largest. Imo no need of "best" as that's the default anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have the timestamp and period offset criteria also, now that we support those too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These may not apply to the use cases that we are considering, but for completeness, we can have it (or, we can choose to add later)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is very simple, but I think we can add it later when there's a request for it.

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks fine tome, the code is a bit confusing to read.

Overall, I am fine with this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can u add a comment before this line what this map is supposed to contain?

The logic in this class is getting quite hard to read, can we even base class some methods and sub-class the partitionGroup vs partitionId for the two different type of streams we suppoer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the comments.

For the refactoring, I agree with you that this class is hard to read. It already has more than 1500 lines and it's worth refactoring, but the refactoring is outside the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make things more readable if we can get the smallest offset all the time? Does it involve multiple calls to the stream, and is that what we are optimizing here? If so, good to add a comment. Otherwise, getting it once unconditionally make make things a bit more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is for optimization. Some topics have hundreds of partitions and we shouldn't call the stream to get the same metadata hundreds of times. I'll add the comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you consider removing the offsetCriteria from the argument here, and incorporating the logic to deal with a non-null value of offsetCriteria outside this method? Not sure if it will make the logic more readable, but worth a try, I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selectStartOffset method is used in two places. If we move the non-null check out of this method, we need to repeat the non-null check two times. That's why it was added to the method.
Also, this method select the start offset. If the offset criteria is provided, it gets the start offset from one map and if not, it gets it from the other map. So that's another reason I think having the non-null check belongs to this method.

@sajjad-moradi sajjad-moradi force-pushed the feature/resume.with.offset.criteria branch from c78ef83 to c43ae70 Compare August 29, 2022 19:50
Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sajjad and I met offline and decided that it is best we stick to the well-known terms here.

I have no further comments, lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants