Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public static String getRealtimeTagForTenant(@Nullable String tenantName) {
return getTagForTenant(tenantName, REALTIME_SERVER_TAG_SUFFIX);
}

/**
* Returns the server tag name for the given tenant and the given table type.
*/
public static String getServerTagForTenant(@Nullable String tenantName, TableType type) {
return getTagForTenant(tenantName, String.format("_%s", type));
}

private static String getTagForTenant(@Nullable String tenantName, String tagSuffix) {
if (tenantName == null) {
return DEFAULT_TENANT_NAME + tagSuffix;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;


@ApiModel
public class InstanceTagUpdateRequest {
@JsonProperty("instanceName")
@ApiModelProperty(example = "Server_a.b.com_20000")
private String _instanceName;
@JsonProperty("newTags")
private List<String> _newTags;

public String getInstanceName() {
return _instanceName;
}

public void setInstanceName(String instanceName) {
_instanceName = instanceName;
}

public List<String> getNewTags() {
return _newTags;
}

public void setNewTags(List<String> newTags) {
_newTags = newTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,24 @@ public OperationValidationResponse putIssue(ErrorCode code, String... args) {
return this;
}

public OperationValidationResponse putAllIssues(List<ErrorWrapper> issues) {
_issues.addAll(issues);
return this;
}

public String getIssueMessage(int index) {
return _issues.get(index).getMessage();
}

public static class ErrorWrapper {
@JsonProperty("code")
ErrorCode _code;
@JsonProperty("message")
String _message;

public ErrorWrapper() {
}

public ErrorWrapper(ErrorCode code, String... args) {
_code = code;
_message = String.format(code._description, args);
Expand All @@ -82,7 +92,11 @@ public String getMessage() {

public enum ErrorCode {
IS_ALIVE("Instance %s is still live"),
CONTAINS_RESOURCE("Instance %s exists in ideal state for %s");
CONTAINS_RESOURCE("Instance %s exists in ideal state for %s"),
MINIMUM_INSTANCE_UNSATISFIED(
"Tenant '%s' will not satisfy minimum '%s' requirement if tag '%s' is removed from %s instance '%s'."),
ALREADY_DEFICIENT_TENANT("Tenant '%s' is low on '%s' instances by %s even after allocating instance %s"),
UNRECOGNISED_TAG_TYPE("The tag '%s' does not follow the suffix convention of either broker or server");

public final String _description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Sets;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
Expand All @@ -29,8 +30,12 @@
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.ClientErrorException;
Expand All @@ -49,6 +54,7 @@
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
Expand Down Expand Up @@ -416,4 +422,147 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

/**
* Endpoint to validate the safety of instance tag update requests.
* This is to ensure that any instance tag update operation that user wants to perform is safe and does not create any
* side effect on the cluster and disturb the cluster consistency.
* This operation does not perform any changes to the cluster, but surfaces the possible issues which might occur upon
* applying the intended changes.
* @param requests list if instance tag update requests
* @return list of {@link OperationValidationResponse} which denotes the validity of each request along with listing
* the issues if any.
*/
@POST
@Path("/instances/updateTags/validate")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal error")
})
public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> requests) {
LOGGER.info("Performing safety check on tag update request received for instances: {}",
requests.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we compute minInstances only for tags that show up in the request?

Copy link
Collaborator Author

@shounakmk219 shounakmk219 Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the tags in request won't do, we also need to fetch the existing/old tags too for minInstances. We can pass a list of tags to get required minInstances but anyway we are forced to parse all the table configs hence didn’t complicate things for now. Let me know if accepting the tags list makes more sense and I will be happy to do it that way.

Map<String, Integer> tagToInstanceCountMap = getUpdatedTagToInstanceCountMap(requests);
Map<String, Integer> tagDeficiency = computeTagDeficiency(tagToInstanceCountMap, tagMinServerMap);

Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(requests.size());
List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
requests.forEach(request -> responseMap.put(request.getInstanceName(), new ArrayList<>()));
for (InstanceTagUpdateRequest request : requests) {
String name = request.getInstanceName();
Set<String> oldTags;
try {
oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
} catch (NullPointerException exception) {
throw new ControllerApplicationException(LOGGER,
String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
}
Set<String> newTags = new HashSet<>(request.getNewTags());
// tags removed from instance
for (String tag : Sets.difference(oldTags, newTags)) {
Integer deficiency = tagDeficiency.get(tag);
if (deficiency != null && deficiency > 0) {
String tenant = TagNameUtils.getTenantFromTag(tag);
String tagType = getInstanceTypeFromTag(tag);
responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name));
tagDeficiency.put(tag, deficiency - 1);
}
}
// newly added tags to instance
for (String tag : newTags) {
String tagType = getInstanceTypeFromTag(tag);
if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
|| name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) {
responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag));
continue;
}
Integer deficiency = tagDeficiency.get(tag);
if (deficiency != null && deficiency > 0) {
tenantIssues.add(new OperationValidationResponse.ErrorWrapper(
OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between ALREADY_DEFICIENT_TENANT vs MINIMUM_INSTANCE_UNSATISFIED?

Aren't they essentially the same error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINIMUM_INSTANCE_UNSATISFIED -> The tenant currently satisfies minimum instances requirement, but the validation request removes few instances from it due to which the tenant ends up with instance deficiency.
ALREADY_DEFICIENT_TENANT -> The tenant is already deficient on instances, and even though the validation request adds few instances to it, its still deficient.

I have made this segregation so that user is aware about the existing deficiencies and not wonder why the validation is failing even though we are adding instances to a tenant.

tagType, deficiency.toString(), name));
}
}
}

// consolidate all the issues based on instances
List<OperationValidationResponse> response = new ArrayList<>(requests.size());
responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty()
? new OperationValidationResponse().setInstanceName(instance).setSafe(true)
: new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false)));
// separate entry to group all the deficient tenant issues as it's not related to any instance
if (!tenantIssues.isEmpty()) {
response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false));
}
return response;
}

private String getInstanceTypeFromTag(String tag) {
if (TagNameUtils.isServerTag(tag)) {
return "server";
} else if (TagNameUtils.isBrokerTag(tag)) {
return "broker";
} else {
return null;
}
}

/**
* Compute the number of deficient instances for each tag.
* The utility accepts two maps
* - map of tags and count of their intended tagged instances
* - map of tags and their minimum number of instance requirements
* And then compares these two maps to return a map of tags and the number of their deficient instances.
*
* @param tagToInstanceCountMap tags and count of their intended tagged instances
* @param tagToMinInstanceCountMap tags and their minimum number of instance requirements
* @return tags and the number of their deficient instances
*/
private Map<String, Integer> computeTagDeficiency(Map<String, Integer> tagToInstanceCountMap,
Map<String, Integer> tagToMinInstanceCountMap) {
Map<String, Integer> tagDeficiency = new HashMap<>();
Map<String, Integer> tagToInstanceCountMapCopy = new HashMap<>(tagToInstanceCountMap);
// compute deficiency for each of the minimum instance requirement entry
tagToMinInstanceCountMap.forEach((tag, minInstances) -> {
Integer updatedInstances = tagToInstanceCountMapCopy.remove(tag);
// if tag is not present in the provided map its considered as if tag is not assigned to any instance
// hence deficiency = minimum instance requirement.
tagDeficiency.put(tag, minInstances - (updatedInstances != null ? updatedInstances : 0));
});
// tags for which minimum instance requirement is not specified are assumed to have no deficiency (deficiency = 0)
tagToInstanceCountMapCopy.forEach((tag, updatedInstances) -> tagDeficiency.put(tag, 0));
return tagDeficiency;
}

/**
* Utility to fetch the existing tags and count of their respective tagged instances and then apply the changes based
* on the provided list of {@link InstanceTagUpdateRequest} to get the updated map of tags and count of their
* respective tagged instances
* @param requests list of {@link InstanceTagUpdateRequest}
* @return map of tags and updated count of their respective tagged instances
*/
private Map<String, Integer> getUpdatedTagToInstanceCountMap(List<InstanceTagUpdateRequest> requests) {
Map<String, Integer> updatedTagInstanceMap = new HashMap<>();
Set<String> visitedInstances = new HashSet<>();
// build the map of tags and their respective instance counts from the given tag update request list
requests.forEach(instance -> {
instance.getNewTags().forEach(tag ->
updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1));
visitedInstances.add(instance.getInstanceName());
});
// add the instance counts to tags for the rest of the instances apart from the ones mentioned in requests
_pinotHelixResourceManager.getAllInstances().forEach(instance -> {
if (!visitedInstances.contains(instance)) {
_pinotHelixResourceManager.getTagsForInstance(instance).forEach(tag ->
updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1));
visitedInstances.add(instance);
}
});
return updatedTagInstanceMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ public Set<String> getAllServerTenantNames() {
return tenantSet;
}

private List<String> getTagsForInstance(String instanceName) {
public List<String> getTagsForInstance(String instanceName) {
InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
return config.getTags();
}
Expand Down Expand Up @@ -3023,6 +3023,15 @@ public TableConfig getTableConfig(String tableNameWithType) {
return ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
}

/**
* Get all table configs.
*
* @return List of table configs. Empty list in case of tables configs does not exist.
*/
public List<TableConfig> getAllTableConfigs() {
return ZKMetadataProvider.getAllTableConfigs(_propertyStore);
}

/**
* Get the offline table config for the given table name.
*
Expand Down Expand Up @@ -4128,6 +4137,31 @@ public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableN
return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
}

/**
* Construct a map of all the tags and their respective minimum instance requirements.
* The minimum instance requirement is computed by
* - for BROKER tenant tag set it to 1 if it hosts any table else set it to 0
* - for SERVER tenant tag iterate over all the tables of that tenant and find the maximum table replication.
* - for rest of the tags just set it to 0
* @return map of tags and their minimum instance requirements
*/
public Map<String, Integer> minimumInstancesRequiredForTags() {
Map<String, Integer> tagMinInstanceMap = new HashMap<>();
for (InstanceConfig instanceConfig : getAllHelixInstanceConfigs()) {
for (String tag : instanceConfig.getTags()) {
tagMinInstanceMap.put(tag, 0);
}
}
for (TableConfig tableConfig : getAllTableConfigs()) {
String tag = TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(),
tableConfig.getTableType());
tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 0), tableConfig.getReplication()));
String brokerTag = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
tagMinInstanceMap.put(brokerTag, 1);
}
return tagMinInstanceMap;
}

/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
Expand Down
Loading