Support pause/resume consumption of realtime tables#8986
Support pause/resume consumption of realtime tables#8986sajjad-moradi merged 11 commits intoapache:masterfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## master #8986 +/- ##
=============================================
+ Coverage 28.83% 69.96% +41.12%
- Complexity 47 4743 +4696
=============================================
Files 1805 1835 +30
Lines 94438 97116 +2678
Branches 14140 14630 +490
=============================================
+ Hits 27234 67943 +40709
+ Misses 64654 24449 -40205
- Partials 2550 4724 +2174
Flags with carried forward coverage won't be shown. Click here to find out more.
|
mcvsubbu
left a comment
There was a problem hiding this comment.
You should also check the idealstate once before entering the consumeLoop()
That would be for handling a rare edge case. The probability of that case happening is very low and can be calculated as If that case happens, the partition will eventually be paused when after the segment gets completed. The operator can reissue the pause request if they need it sooner. |
Makes sense. Keeps the first version simpler while kicking the tires. We can enhance this as we hear back regarding its use. |
pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
Outdated
Show resolved
Hide resolved
...ntroller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
Outdated
Show resolved
Hide resolved
...ller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Show resolved
Hide resolved
| IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true); | ||
| Set<String> consumingSegments = findConsumingSegments(updatedIdealState); | ||
| sendForceCommitMessageToServers(tableNameWithType, consumingSegments); | ||
| return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set." |
There was a problem hiding this comment.
I would prefer returning a different class here than PauseStatus, since PauseStatus is an externally visible class, and we need to keep compatibility. For internal methods, we should be able to change things as needed
There was a problem hiding this comment.
This is not internally used. We return the pause status to the client.
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Show resolved
Hide resolved
...server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
Show resolved
Hide resolved
mcvsubbu
left a comment
There was a problem hiding this comment.
Overall, this looks good, other than minor comments.
mcvsubbu
left a comment
There was a problem hiding this comment.
Other than a few minor comments, lgtm
| } | ||
| if (!idealState.isEnabled()) { | ||
| throw new ControllerApplicationException(LOGGER, "Ideal State is disabled for table" + tableNameWithType, | ||
| throw new ControllerApplicationException(LOGGER, "Ideal State is disabled for table " + tableNameWithType, |
There was a problem hiding this comment.
| throw new ControllerApplicationException(LOGGER, "Ideal State is disabled for table " + tableNameWithType, | |
| throw new ControllerApplicationException(LOGGER, "Disabled table: " + tableNameWithType, |
| for (int i = 0; i < 30; i++) { | ||
| PinotLLCRealtimeSegmentManager.PauseStatus pauseStatus = | ||
| getControllerRequestClient().getPauseStatus(getTableName()); | ||
| for (int i = 0; i < 60; i++) { |
There was a problem hiding this comment.
can we declare 30/60/anytning as a final int since it is used in more than one place (and they need to be consistent)? thanks
|
@sajjad-moradi There seems to be test failure on this - |
The test takes a long time. |
801fe1c to
090cdc9
Compare
|
@sajjad-moradi would you please add this to the documentation? (or point me to it if you already have, I couldn't find it easily). I think this would be a good place: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion, for all of them pause/resume/forceCommit/change stream. |
I'll try to update the doc this coming week. |
Description
This PR adds support for pause/resume consumption on realtime table without disabling the table. The following endpoints on controller can be used to pause/resume consumption:
For more design details, please refer to this doc.
Note that this PR will solve the following issues:
#6302, #7280, #6854, #6679, #7100, and #7039.
Testing Done
Verified the desired behavior locally by running LLRealtimeClusterIntegrationTest and using the endpoints to pause, resume, and get the consumption status.