-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Instance retag validation check api #11077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b1c967b
fa74d41
1c00c2e
6831db1
336fe91
973a1d8
e059a3e
28d8434
71686cd
e3ef195
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.controller.api.resources; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import io.swagger.annotations.ApiModel; | ||
| import io.swagger.annotations.ApiModelProperty; | ||
| import java.util.List; | ||
|
|
||
|
|
||
| @ApiModel | ||
| public class InstanceTagUpdateRequest { | ||
| @JsonProperty("instanceName") | ||
| @ApiModelProperty(example = "Server_a.b.com_20000") | ||
| private String _instanceName; | ||
| @JsonProperty("newTags") | ||
| private List<String> _newTags; | ||
|
|
||
| public String getInstanceName() { | ||
| return _instanceName; | ||
| } | ||
|
|
||
| public void setInstanceName(String instanceName) { | ||
| _instanceName = instanceName; | ||
| } | ||
|
|
||
| public List<String> getNewTags() { | ||
| return _newTags; | ||
| } | ||
|
|
||
| public void setNewTags(List<String> newTags) { | ||
| _newTags = newTags; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
|
|
||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.fasterxml.jackson.databind.node.ObjectNode; | ||
| import com.google.common.collect.Sets; | ||
| import io.swagger.annotations.Api; | ||
| import io.swagger.annotations.ApiKeyAuthDefinition; | ||
| import io.swagger.annotations.ApiOperation; | ||
|
|
@@ -29,8 +30,12 @@ | |
| import io.swagger.annotations.Authorization; | ||
| import io.swagger.annotations.SecurityDefinition; | ||
| import io.swagger.annotations.SwaggerDefinition; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import javax.inject.Inject; | ||
| import javax.ws.rs.ClientErrorException; | ||
|
|
@@ -49,6 +54,7 @@ | |
| import javax.ws.rs.core.Response; | ||
| import org.apache.helix.model.InstanceConfig; | ||
| import org.apache.pinot.common.utils.config.InstanceUtils; | ||
| import org.apache.pinot.common.utils.config.TagNameUtils; | ||
| import org.apache.pinot.controller.api.access.AccessType; | ||
| import org.apache.pinot.controller.api.access.Authenticate; | ||
| import org.apache.pinot.controller.api.exception.ControllerApplicationException; | ||
|
|
@@ -416,4 +422,147 @@ public List<OperationValidationResponse> instanceDropSafetyCheck( | |
| Response.Status.INTERNAL_SERVER_ERROR, e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Endpoint to validate the safety of instance tag update requests. | ||
| * This is to ensure that any instance tag update operation that user wants to perform is safe and does not create any | ||
| * side effect on the cluster and disturb the cluster consistency. | ||
| * This operation does not perform any changes to the cluster, but surfaces the possible issues which might occur upon | ||
| * applying the intended changes. | ||
| * @param requests list if instance tag update requests | ||
| * @return list of {@link OperationValidationResponse} which denotes the validity of each request along with listing | ||
| * the issues if any. | ||
| */ | ||
| @POST | ||
| @Path("/instances/updateTags/validate") | ||
| @Produces(MediaType.APPLICATION_JSON) | ||
| @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.") | ||
| @ApiResponses(value = { | ||
| @ApiResponse(code = 200, message = "Success"), | ||
| @ApiResponse(code = 500, message = "Internal error") | ||
| }) | ||
| public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> requests) { | ||
| LOGGER.info("Performing safety check on tag update request received for instances: {}", | ||
| requests.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList())); | ||
| Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags(); | ||
| Map<String, Integer> tagToInstanceCountMap = getUpdatedTagToInstanceCountMap(requests); | ||
| Map<String, Integer> tagDeficiency = computeTagDeficiency(tagToInstanceCountMap, tagMinServerMap); | ||
|
|
||
| Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(requests.size()); | ||
| List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>(); | ||
| requests.forEach(request -> responseMap.put(request.getInstanceName(), new ArrayList<>())); | ||
| for (InstanceTagUpdateRequest request : requests) { | ||
| String name = request.getInstanceName(); | ||
| Set<String> oldTags; | ||
| try { | ||
| oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name)); | ||
| } catch (NullPointerException exception) { | ||
| throw new ControllerApplicationException(LOGGER, | ||
| String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED); | ||
| } | ||
| Set<String> newTags = new HashSet<>(request.getNewTags()); | ||
| // tags removed from instance | ||
| for (String tag : Sets.difference(oldTags, newTags)) { | ||
snleee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Integer deficiency = tagDeficiency.get(tag); | ||
| if (deficiency != null && deficiency > 0) { | ||
| String tenant = TagNameUtils.getTenantFromTag(tag); | ||
| String tagType = getInstanceTypeFromTag(tag); | ||
| responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper( | ||
| OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name)); | ||
| tagDeficiency.put(tag, deficiency - 1); | ||
| } | ||
| } | ||
| // newly added tags to instance | ||
| for (String tag : newTags) { | ||
| String tagType = getInstanceTypeFromTag(tag); | ||
| if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) | ||
| || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) { | ||
| responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper( | ||
| OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag)); | ||
| continue; | ||
| } | ||
| Integer deficiency = tagDeficiency.get(tag); | ||
| if (deficiency != null && deficiency > 0) { | ||
| tenantIssues.add(new OperationValidationResponse.ErrorWrapper( | ||
| OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference between Aren't they essentially the same error?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I have made this segregation so that user is aware about the existing deficiencies and not wonder why the validation is failing even though we are adding instances to a tenant. |
||
| tagType, deficiency.toString(), name)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // consolidate all the issues based on instances | ||
| List<OperationValidationResponse> response = new ArrayList<>(requests.size()); | ||
| responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty() | ||
| ? new OperationValidationResponse().setInstanceName(instance).setSafe(true) | ||
| : new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false))); | ||
| // separate entry to group all the deficient tenant issues as it's not related to any instance | ||
| if (!tenantIssues.isEmpty()) { | ||
| response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false)); | ||
| } | ||
| return response; | ||
| } | ||
|
|
||
| private String getInstanceTypeFromTag(String tag) { | ||
| if (TagNameUtils.isServerTag(tag)) { | ||
| return "server"; | ||
| } else if (TagNameUtils.isBrokerTag(tag)) { | ||
| return "broker"; | ||
| } else { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Compute the number of deficient instances for each tag. | ||
| * The utility accepts two maps | ||
| * - map of tags and count of their intended tagged instances | ||
| * - map of tags and their minimum number of instance requirements | ||
| * And then compares these two maps to return a map of tags and the number of their deficient instances. | ||
| * | ||
| * @param tagToInstanceCountMap tags and count of their intended tagged instances | ||
| * @param tagToMinInstanceCountMap tags and their minimum number of instance requirements | ||
| * @return tags and the number of their deficient instances | ||
| */ | ||
| private Map<String, Integer> computeTagDeficiency(Map<String, Integer> tagToInstanceCountMap, | ||
| Map<String, Integer> tagToMinInstanceCountMap) { | ||
| Map<String, Integer> tagDeficiency = new HashMap<>(); | ||
| Map<String, Integer> tagToInstanceCountMapCopy = new HashMap<>(tagToInstanceCountMap); | ||
| // compute deficiency for each of the minimum instance requirement entry | ||
| tagToMinInstanceCountMap.forEach((tag, minInstances) -> { | ||
| Integer updatedInstances = tagToInstanceCountMapCopy.remove(tag); | ||
| // if tag is not present in the provided map its considered as if tag is not assigned to any instance | ||
| // hence deficiency = minimum instance requirement. | ||
| tagDeficiency.put(tag, minInstances - (updatedInstances != null ? updatedInstances : 0)); | ||
| }); | ||
| // tags for which minimum instance requirement is not specified are assumed to have no deficiency (deficiency = 0) | ||
| tagToInstanceCountMapCopy.forEach((tag, updatedInstances) -> tagDeficiency.put(tag, 0)); | ||
| return tagDeficiency; | ||
| } | ||
|
|
||
| /** | ||
| * Utility to fetch the existing tags and count of their respective tagged instances and then apply the changes based | ||
| * on the provided list of {@link InstanceTagUpdateRequest} to get the updated map of tags and count of their | ||
| * respective tagged instances | ||
| * @param requests list of {@link InstanceTagUpdateRequest} | ||
| * @return map of tags and updated count of their respective tagged instances | ||
| */ | ||
| private Map<String, Integer> getUpdatedTagToInstanceCountMap(List<InstanceTagUpdateRequest> requests) { | ||
| Map<String, Integer> updatedTagInstanceMap = new HashMap<>(); | ||
| Set<String> visitedInstances = new HashSet<>(); | ||
| // build the map of tags and their respective instance counts from the given tag update request list | ||
| requests.forEach(instance -> { | ||
| instance.getNewTags().forEach(tag -> | ||
| updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1)); | ||
| visitedInstances.add(instance.getInstanceName()); | ||
| }); | ||
| // add the instance counts to tags for the rest of the instances apart from the ones mentioned in requests | ||
| _pinotHelixResourceManager.getAllInstances().forEach(instance -> { | ||
| if (!visitedInstances.contains(instance)) { | ||
| _pinotHelixResourceManager.getTagsForInstance(instance).forEach(tag -> | ||
| updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1)); | ||
| visitedInstances.add(instance); | ||
| } | ||
| }); | ||
| return updatedTagInstanceMap; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we compute minInstances only for tags that show up in the request?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the tags in request won't do, we also need to fetch the existing/old tags too for minInstances. We can pass a list of tags to get required minInstances but anyway we are forced to parse all the table configs hence didn’t complicate things for now. Let me know if accepting the tags list makes more sense and I will be happy to do it that way.