Skip to content

Support pause/resume consumption of realtime tables#8986

Merged
sajjad-moradi merged 11 commits intoapache:masterfrom
sajjad-moradi:feature/pause.consumption
Jul 20, 2022
Merged

Support pause/resume consumption of realtime tables#8986
sajjad-moradi merged 11 commits intoapache:masterfrom
sajjad-moradi:feature/pause.consumption

Conversation

@sajjad-moradi
Copy link
Contributor

Description

This PR adds support for pause/resume consumption on realtime table without disabling the table. The following endpoints on controller can be used to pause/resume consumption:

  • /tables/{tableName}/pauseConsumption
  • /tables/{tableName}/resumeConsumption
  • /tables/{tableName}/consumptionStatus

For more design details, please refer to this doc.

Note that this PR will solve the following issues:
#6302, #7280, #6854, #6679, #7100, and #7039.

Testing Done

Verified the desired behavior locally by running LLRealtimeClusterIntegrationTest and using the endpoints to pause, resume, and get the consumption status.

@codecov-commenter
Copy link

codecov-commenter commented Jun 29, 2022

Codecov Report

Merging #8986 (090cdc9) into master (db07f29) will increase coverage by 41.12%.
The diff coverage is 10.90%.

@@              Coverage Diff              @@
##             master    #8986       +/-   ##
=============================================
+ Coverage     28.83%   69.96%   +41.12%     
- Complexity       47     4743     +4696     
=============================================
  Files          1805     1835       +30     
  Lines         94438    97116     +2678     
  Branches      14140    14630      +490     
=============================================
+ Hits          27234    67943    +40709     
+ Misses        64654    24449    -40205     
- Partials       2550     4724     +2174     
Flag Coverage Δ
integration1 26.34% <8.48%> (-0.18%) ⬇️
integration2 24.74% <8.48%> (-0.27%) ⬇️
unittests1 66.91% <3.33%> (?)
unittests2 15.32% <5.10%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ache/pinot/common/messages/ForceCommitMessage.java 0.00% <0.00%> (ø)
...ot/common/protocols/SegmentCompletionProtocol.java 95.71% <ø> (+3.33%) ⬆️
...he/pinot/controller/api/resources/PauseStatus.java 0.00% <0.00%> (ø)
.../api/resources/PeriodicTaskInvocationResponse.java 0.00% <0.00%> (ø)
...es/PinotControllerPeriodicTaskRestletResource.java 0.00% <0.00%> (ø)
...ller/api/resources/PinotRealtimeTableResource.java 0.00% <0.00%> (ø)
...inot/controller/helix/ControllerRequestClient.java 54.78% <0.00%> (+1.78%) ⬆️
...ntroller/helix/core/PinotHelixResourceManager.java 67.50% <0.00%> (+28.59%) ⬆️
.../helix/core/realtime/SegmentCompletionManager.java 72.96% <0.00%> (+26.12%) ⬆️
...server/starter/helix/HelixInstanceDataManager.java 73.52% <0.00%> (-5.01%) ⬇️
... and 1313 more

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also check the idealstate once before entering the consumeLoop()

@sajjad-moradi
Copy link
Contributor Author

You should also check the idealstate once before entering the consumeLoop()

That would be for handling a rare edge case. The probability of that case happening is very low and can be calculated as t1/t2 where
t1: time duration between receiving online-to-consuming msg and entering the consumeLoop
t2: consumeLoop duration
Note that t1 is in microseconds-milliseconds and t2 is in hours (depending on table config).

If that case happens, the partition will eventually be paused when after the segment gets completed. The operator can reissue the pause request if they need it sooner.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Jul 2, 2022

You should also check the idealstate once before entering the consumeLoop()

That would be for handling a rare edge case. The probability of that case happening is very low and can be calculated as t1/t2 where t1: time duration between receiving online-to-consuming msg and entering the consumeLoop t2: consumeLoop duration Note that t1 is in microseconds-milliseconds and t2 is in hours (depending on table config).

If that case happens, the partition will eventually be paused when after the segment gets completed. The operator can reissue the pause request if they need it sooner.

Makes sense. Keeps the first version simpler while kicking the tires. We can enhance this as we hear back regarding its use.

IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true);
Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer returning a different class here than PauseStatus, since PauseStatus is an externally visible class, and we need to keep compatibility. For internal methods, we should be able to change things as needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not internally used. We return the pause status to the client.

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks good, other than minor comments.

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than a few minor comments, lgtm

}
if (!idealState.isEnabled()) {
throw new ControllerApplicationException(LOGGER, "Ideal State is disabled for table" + tableNameWithType,
throw new ControllerApplicationException(LOGGER, "Ideal State is disabled for table " + tableNameWithType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new ControllerApplicationException(LOGGER, "Ideal State is disabled for table " + tableNameWithType,
throw new ControllerApplicationException(LOGGER, "Disabled table: " + tableNameWithType,

for (int i = 0; i < 30; i++) {
PinotLLCRealtimeSegmentManager.PauseStatus pauseStatus =
getControllerRequestClient().getPauseStatus(getTableName());
for (int i = 0; i < 60; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we declare 30/60/anytning as a final int since it is used in more than one place (and they need to be consistent)? thanks

@KKcorps
Copy link
Contributor

KKcorps commented Jul 20, 2022

@sajjad-moradi There seems to be test failure on this -

 Error:    PauseResumeConsumptionIntegrationTest.setUp:69->verifyPause:104->verify:116->assertCountStar:138 expected [9292] but found [5000]
[33317](https://github.com/apache/pinot/runs/7288357664?check_suite_focus=true#step:5:33318)

@sajjad-moradi
Copy link
Contributor Author

sajjad-moradi commented Jul 20, 2022

@sajjad-moradi There seems to be test failure on this -

 Error:    PauseResumeConsumptionIntegrationTest.setUp:69->verifyPause:104->verify:116->assertCountStar:138 expected [9292] but found [5000]
[33317](https://github.com/apache/pinot/runs/7288357664?check_suite_focus=true#step:5:33318)

The test takes a long time. Disabling it for now. Added a comment there Deleted and will bring it back later with shorter execution time (maybe by producing less events).
The e2e functionality has been tested locally though, so it's not worth increasing the timeout which in turns lead to much higher build time.

@sajjad-moradi sajjad-moradi force-pushed the feature/pause.consumption branch from 801fe1c to 090cdc9 Compare July 20, 2022 21:27
@sajjad-moradi sajjad-moradi merged commit 6903856 into apache:master Jul 20, 2022
@KKcorps KKcorps added the release-notes Referenced by PRs that need attention when compiling the next release notes label Aug 12, 2022
@npawar
Copy link
Contributor

npawar commented Aug 27, 2022

@sajjad-moradi would you please add this to the documentation? (or point me to it if you already have, I couldn't find it easily). I think this would be a good place: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion, for all of them pause/resume/forceCommit/change stream.

@sajjad-moradi
Copy link
Contributor Author

@sajjad-moradi would you please add this to the documentation? (or point me to it if you already have, I couldn't find it easily). I think this would be a good place: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion, for all of them pause/resume/forceCommit/change stream.

I'll try to update the doc this coming week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants