Realtime table consumption resume API#8663
Conversation
|
cc: @npawar @Jackie-Jiang |
Codecov Report
@@ Coverage Diff @@
## master #8663 +/- ##
============================================
+ Coverage 68.02% 69.68% +1.65%
- Complexity 4616 4617 +1
============================================
Files 1732 1733 +1
Lines 90734 90762 +28
Branches 13507 13512 +5
============================================
+ Hits 61724 63246 +1522
+ Misses 24672 23109 -1563
- Partials 4338 4407 +69
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Let's move the actual logic into the PinotLLCRealtimeSegmentManager and inject that into this API class.
There was a problem hiding this comment.
We should not ask user to provide these info. As a starting point, we can use the end offset of the last completed segment as the start offset of the new segment. Then we can add support to all the OffsetCriteria and also the custom offset
There was a problem hiding this comment.
I was following the discussion on #6679, and the general consensus seemed to be on first allowing explicitly specifying partitionId -> offset to create new consuming segments, and then support implicitly deriving the offsets from existing segments?
If that's not the case, I will change this
There was a problem hiding this comment.
Anyway I've made the changes. We can add support for explicitly specifying offsets on top of that.
There was a problem hiding this comment.
This to-do should be addressed in the first version
There was a problem hiding this comment.
Yes thats the plan. Will address
There was a problem hiding this comment.
First check if there is already a consuming segment for this partition
There was a problem hiding this comment.
| @ApiOperation(value = "Resume a realtime table", notes = "Resume a segment") | |
| @ApiOperation(value = "Resume the consumption of a realtime table", notes = "Resume the consumption of a realtime table") |
There was a problem hiding this comment.
I feel this should be under the table tag
There was a problem hiding this comment.
| @Path("/segmentState/{tableName}/resumeRealtimeTable") | |
| @Path("/tables/{tableName}/resumeConsumption") |
There was a problem hiding this comment.
It can take an optional parameter of the partitions to be resumed
dffc867 to
497d205
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Add @sajjad-moradi to also take a look
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
sajjad-moradi
left a comment
There was a problem hiding this comment.
There is a design doc for a broader topic "pause/resume consumption on realtime table". The functionality of the endpoint provided in this PR will be part of the resume endpoint mentioned in the design doc.
If there's not an urgent need to have this functionality, I'm going to work on the pause/resume soon so we don't need to refactor the code in this PR when pause/resume is implemented.
But if this functionality is needed ASAP, for now we can have it in the resume endpoint and later on when we implement the broader pause/resume feature, we should make sure that the resume endpoint works for paused tables and it also support this PR's functionality. We should be careful for the API to be forward compatible with pause/resume.
|
@sajjad-moradi I had a discussion with @npawar on this, and seems like this functionality addresses issues arising due to accidental deletion of consuming segments, and therefore its availability is of importance, beyond the Pause / Resume feature too. cc: @npawar for comments |
497d205 to
a2e6d13
Compare
Not really. I saw in the PR instead of ideal state, metadata is used. So the changes needed is very similar to the resume endpoint in the design doc. Just wanted to make sure these two are aligned. Will review your changes shortly. |
sajjad-moradi
left a comment
There was a problem hiding this comment.
It's not easy to add a unit test here. Could you please add a small section to the PR description on how you tested the new functionality.
...oller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentStateResource.java
Outdated
Show resolved
Hide resolved
...oller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentStateResource.java
Outdated
Show resolved
Hide resolved
...oller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentStateResource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
createNewSegmentZKMetadata can be used here.
There was a problem hiding this comment.
tableConfig.getTableName will always give you tableNameWithType
There was a problem hiding this comment.
are you intentionally including all new partition groups, instead of just the ones that were deleted and user wants to restore? I don't think it's a good idea to introduce yet another place where all new partition groups are read to start consumption (it already happens in ensureAllPartitionsConsuming)
There was a problem hiding this comment.
Intentional, yes. But we can discuss this offline. Idea was to keep this as a generic resumeConsumption API, which also happens to double up as a fix for deleted segments.
There was a problem hiding this comment.
I see. But it's not going to be a generic resume consumption Api unless all the other cases from ensureAllPartitions are also included. Now we have two ways to resume consumption, both doing only partial cases.
There was a problem hiding this comment.
What do you think if we make the following changes to ensureAllPartitionsConsuming method :
ensureAllPartitionsConsuming(..., boolean enforcedByAdmin) {
...
if (enforcedByAdmin) {
// handle the case where a realtime segment is manually deleted
}
...
}Then this method is called in the resume endpoint with enforcedByAdmin = true and in the periodic task with enforcedByAdmin = false.
Later if we have more cases that require admin intervention, we can add them to the if (enforcedByAdmin) {...} block.
There was a problem hiding this comment.
this is a great idea. +1 on doing it this way
There was a problem hiding this comment.
Ack. This question turned out to be redundant since creating new consuming segments for new partitions is already handled by ensureAllPartitionsConsuming.
All in all, the API does everything ensureAllPartitionsConsuming already does, and additionally also creates consuming segments for partitions where CONSUMING segments were deleted.
There was a problem hiding this comment.
IMO, having a single flag of enforcedByAdmin can be quite confusing and hard to use because this module (segment manager) should not be aware of the intention of the admin. I'd suggest making the flag specific and self-explained, such as recreateDeletedConsumingSegment. In the future if we want to add other checks, we can introduce other flags, or wrap the flags into a options class, but each flag should be specific. @sajjad-moradi @npawar What's your opinion on this suggestion?
There was a problem hiding this comment.
my thought was also of a generic name like "fixAllCases" or "fixManualErrors" . But i see your point, self-explanatory name would be a good start and introduce more as and when needed.
There was a problem hiding this comment.
That's a valid point. We can start with this specific flag now and later on, when we add more cases, we rename the flag to be more generic or add more options.
There was a problem hiding this comment.
Ack. Renaming the flag. Wrapping these flags into a Config / Props class eventually sounds good to me.
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
we have to ensure that this change plays nicely with the code in ensureAllPartitionsConsuming. That method also creates new zkMetadata and idealState entry, and these 2 should not interfere with each other. You'll have to add some synchronization on table name between the 2 methods
There was a problem hiding this comment.
With the redesign, this is achieved. Although I had to serialize calls to ensureAllPartitionsConsuming since there is a possibility of API call running concurrently with the periodic task. I think we can improve upon this by having table level locks being acquired within the function. Have added that as a todo since it should not be much of an issue I feel.
There was a problem hiding this comment.
ensureAllPartitionsConsuming called in periodic task only runs on the lead controller of each table while this new resume API can be executed on any controller, so locking/synchronization is not much of a help here.
There was a problem hiding this comment.
Makes sense. TableName level locks won't help much in that case. But synchronizing the method is still needed? To address the, albeit rare, possibility of the API being hit on the lead controller, while the periodic task is running.
Since its a background periodic task, the overhead of the syncronized function shouldn't be much I feel.. Any suggestions on this?
There was a problem hiding this comment.
synchronization is still needed for same controller yes.. but the other case is more concerning. for instance, on the 2 controllers (1 via periodic task thread other via resumeConsumption thread), we could enter setupNewPartitionGroup method at once, and end up with 2 zk metadata and 2 CONSUMING segments for a new partition. Many more such cases could come up.
Wondering now, if adding a back door entry to this method from any controller is a good idea. This won't happen if only periodic task is allowed to execute that method (be it scheduled or manual). So 1) should this flag just be part of manual periodic task trigger options and then 2) this API internally just invokes the periodic task
There was a problem hiding this comment.
Good point. So it can potentially lead to race condition if 2 runs happen at exactly the same time, and both create the new segment ZK metadata. One of the segment ZK metadata will be picked and continue, the other will be left orphan.
@saurabhd336 Let's change ensureAllPartitionsConsuming to synchronized, and add a TODO describing this potential race condition. We should probably clean up the orphan segment ZK metadata if it is not in the ideal state. This can be handled in a separate PR.
There was a problem hiding this comment.
We don't need to do so if we go with manually kicking RealtimeSegmentValidation job as Neha suggested. The code in PinotControllerPeriodicTaskRestletResource can be modified a bit to address to achieve this.
There was a problem hiding this comment.
@sajjad-moradi Not sure I understand 'manually kicking RealtimeSegmentValidation job'. That still means ensureAllPartitionsConsuming can get called concurrently by the API thread and the scheduled thread?
There was a problem hiding this comment.
@saurabhd336 just think of the periodic task. There's an API to invoke periodic task manually (look for Periodic Task tab in swagger). Whether it runs by schedule, or by manual trigger, periodic task can only run on lead controller of a table, so that eliminates across controller clashes. As for same controller, if you look at BasePeriodicTask, you'll see it doesn't let more than one task to run at once.
So the suggestion is, rely on this mechanism as we already know this works and prevents clashes. One way to rely on this mechanism is by introducing a query param to the runPeriodicTask API with our extra config and pass it to the periodic task execution. Another way could be to keep your new API, but again use same mechanism happening in runPeriodicTask API to trigger the task.
does this help clarify? We can chat on the slack channel if need be.
There was a problem hiding this comment.
I did not have full context on the periodic task execution via API. But now I've updated the PR with
- Allowing custom params to be passed to on demand task execution via helix messages.
- Changed the API to use the periodic task execution instead of directly calling the method.
Please have a look @npawar @Jackie-Jiang @sajjad-moradi
51b4dc1 to
5f0fd4d
Compare
|
@npawar @Jackie-Jiang @sajjad-moradi I've updated the PR as per the comments. Please have a look. |
6004072 to
256e678
Compare
There was a problem hiding this comment.
Can we extract some common logic to a separate helper method? It is almost identical to the all OFFLINE scenario except for it using the end-offset for the latest segment instead of start-offset
There was a problem hiding this comment.
Added a new function for this
There was a problem hiding this comment.
It should work without synchronization because it is performing a check and write
There was a problem hiding this comment.
Let's update the javadoc for this method to include the new scenario:
If the consuming segment is deleted:
Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE
(Note that this is very similar to the failure between step-1 and step-2, the only difference is the state in the ideal state)
There was a problem hiding this comment.
It should work without synchronization because it is performing a check and write
Wasn't aware of this. Looks like helix itself guarantees concurrent updates are handeled safely. Removed synchronization.
Let's update the javadoc for this method to include the new scenario:
If the consuming segment is deleted: Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE(Note that this is very similar to the failure between step-1 and step-2, the only difference is the state in the ideal state)
Ack. Added.
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
The new scenario is very similar to 1, so we can keep them together:
We first check if the latestSegmentZKMetadata is DONE, then check if instanceStateMap contains CONSUMING segment.
There was a problem hiding this comment.
(minor) Change the index (currently there are 2 index 2)
There was a problem hiding this comment.
| // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has | |
| // If we get here, that means in IdealState, the latest segment has all replicas ONLINE |
There was a problem hiding this comment.
Let's add some checks before fixing the segment:
- latestSegmentZKMetadata.getStatus() == Status.DONE
- isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
3326983 to
473b1e9
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
LGTM. @npawar @sajjad-moradi Can you please also take another look
There was a problem hiding this comment.
(nit)
| // If we get here, that means in IdealState, the latest segment has all replicas ONLINE, but no | |
| // CONSUMING segments. | |
| // If we get here, that means in IdealState, the latest segment has all replicas ONLINE. |
There was a problem hiding this comment.
(code style) Let's reformat this part
473b1e9 to
57e5d47
Compare
There was a problem hiding this comment.
We may take a Map<String, String> here and use the map field from the ZNRecord
There was a problem hiding this comment.
Suggest naming it "properties" to be consistent with the name in RunPeriodicTaskMessageHandler
There was a problem hiding this comment.
Ack. Renamed to 'taskProperties'
There was a problem hiding this comment.
Here we can write the key-value pairs of the map into the Properties
There was a problem hiding this comment.
(minor) Make "RealtimeSegmentValidationManager" a constant
There was a problem hiding this comment.
Suggest making it more explicit that this is a resume consumption request
There was a problem hiding this comment.
Suggest passing the whole Properties into the processTables to be more flexible
There was a problem hiding this comment.
Same here, passing the whole Properties into preprocess
There was a problem hiding this comment.
We shouldn't need to catch this exception when the key-values are already put into the properties
There was a problem hiding this comment.
Moving to Properties instead of json makes preprocess not throw exceptions. Ack
There was a problem hiding this comment.
This is already an inner class of RealtimeSegmentValidationManager, so no need to have a separate wrapper class for the parameters. Suggest directly adding _recreateDeletedConsumingSegment into the Context class
There was a problem hiding this comment.
This part is almost identical to the PinotControllerPeriodicTaskRestletResource, let's add a TODO here to extract the common part in the future
There was a problem hiding this comment.
IMO this is a technical debt. The code as well as comments are duplicated here. We can simply refactor it to a class (or a util method), something like PeriodicTaskTriggerer.sendRunPeriodicTaskMessage() or PeriodicTaskTriggerUtil.sendRunPeriodicTaskMessage().
There was a problem hiding this comment.
Ack. PinotHelixResourceManager seemed like the most natural place for the method to reside. Have moved it there and changed both the classes to use it. Please check.
There was a problem hiding this comment.
Mention here that this happens only if the special flag is set
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
96d9504 to
777ce7a
Compare
777ce7a to
a74f130
Compare
sajjad-moradi
left a comment
There was a problem hiding this comment.
LGTM, with a few minor comments.
There was a problem hiding this comment.
IMO this is a technical debt. The code as well as comments are duplicated here. We can simply refactor it to a class (or a util method), something like PeriodicTaskTriggerer.sendRunPeriodicTaskMessage() or PeriodicTaskTriggerUtil.sendRunPeriodicTaskMessage().
There was a problem hiding this comment.
Move the new comments here. Up to this line, the comments explain different controller failure cases in segment commit protocol. Let's not break those cases apart. Also let's change it to If the consuming segment is deleted by user, intentionally or by mistake:...
There was a problem hiding this comment.
Maybe add ... is set to true which means it's manually triggered by admin not by automatic periodic task
There was a problem hiding this comment.
Move this to the next if statement.
|
Please add a
|
There was a problem hiding this comment.
(minor) Can be simplified
| if (periodicTaskProperties.containsKey(RECREATE_DELETED_CONSUMING_SEGMENT_KEY)) { | |
| context._recreateDeletedConsumingSegment = | |
| Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY)); | |
| } | |
| context._recreateDeletedConsumingSegment = | |
| Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY)); |
pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
Outdated
Show resolved
Hide resolved
e7e15da to
41fb036
Compare
This PR adds controller API support for resuming consumption for realtime tables after deletion of CONSUMING segment from ZK.
GH issue: #6679
Testing Done
Recovery from CONSUMING segment deleted via API
RealtimeSegmentValidationManagertask either via the periodictask/run or through auto scheduled task doesn't fix the table state./tables/{tableName}/resumeConsumptionAPI fixes it by creating new IN_PROGRESS and CONSUMING segments in PROPERTYSTORE and IDEALSTATE respectively.Recovery from CONSUMING segment deleted manually
All steps remain same as above except that the consuming segment is deleted from ZK manually via controller UI / ZK client.
cc: @sajjad-moradi