Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,14 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t

return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
}

public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
// If the instance assignment config is not null and the partition selector is
// MIRROR_SERVER_SET_PARTITION_SELECTOR,
return tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()) != null
&& InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, instancePartitionsType)
.getPartitionSelector()
== InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
import static org.apache.pinot.spi.utils.CommonConstants.*;


@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
Expand Down Expand Up @@ -244,20 +244,38 @@ public Map<String, InstancePartitions> assignInstances(
private void assignInstancesForInstancePartitionsType(Map<String, InstancePartitions> instancePartitionsMap,
TableConfig tableConfig, List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
instancePartitionsMap.put(instancePartitionsType.toString(),
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName)));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions));
} else {
if (InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType)) {
// fetch the existing instance partitions, if the table, this is referenced in the new instance partitions
// generation for minimum difference
InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions(
_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
instancePartitionsMap.put(instancePartitionsType.toString(),
new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions));
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
// fetch the pre-configured instance partitions, the renaming part is irrelevant as we are not really
// preserving this preConfigured, but only using it as a reference to generate the new instance partitions
InstancePartitions preConfigured =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName));
instancePartitionsMap.put(instancePartitionsType.toString(),
new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, preConfigured));
} else {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
instancePartitionsMap.put(instancePartitionsType.toString(),
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName)));
}
}
}

private void assignInstancesForTier(Map<String, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +51,8 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
Expand All @@ -68,13 +71,14 @@
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
import static org.apache.pinot.spi.utils.CommonConstants.*;


/**
Expand Down Expand Up @@ -286,6 +290,82 @@ public String getTablesOnTenant(
}
}

@GET
@Path("/tenants/{tenantName}/instancePartitions")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_INSTANCE_PARTITIONS)
@Authenticate(AccessType.READ)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the instance partitions of a tenant")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Success", response = InstancePartitions.class),
@ApiResponse(code = 404, message = "Instance partitions not found")})
public InstancePartitions getInstancePartitions(
@ApiParam(value = "Tenant name ", required = true) @PathParam("tenantName") String tenantName,
@ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", required = true,
allowableValues = "OFFLINE, CONSUMING, COMPLETED")
@QueryParam("instancePartitionType") String instancePartitionType) {
String tenantNameWithType = InstancePartitionsType.valueOf(instancePartitionType)
.getInstancePartitionsName(tenantName);
InstancePartitions instancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
tenantNameWithType);

if (instancePartitions == null) {
throw new ControllerApplicationException(LOGGER,
String.format("Failed to find the instance partitions for %s", tenantNameWithType),
Response.Status.NOT_FOUND);
} else {
return instancePartitions;
}
}

@PUT
@Path("/tenants/{tenantName}/instancePartitions")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_INSTANCE_PARTITIONS)
@Authenticate(AccessType.UPDATE)
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Update an instance partition for a server type in a tenant")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Success", response = InstancePartitions.class),
@ApiResponse(code = 400, message = "Failed to deserialize/validate the instance partitions"),
@ApiResponse(code = 500, message = "Error updating the tenant")})
public InstancePartitions assignInstancesPartitionMap(
@ApiParam(value = "Tenant name ", required = true) @PathParam("tenantName") String tenantName,
@ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", required = true,
allowableValues = "OFFLINE, CONSUMING, COMPLETED")
@QueryParam("instancePartitionType") String instancePartitionType,
String instancePartitionsStr) {
InstancePartitions instancePartitions;
try {
instancePartitions = JsonUtils.stringToObject(instancePartitionsStr, InstancePartitions.class);
} catch (IOException e) {
throw new ControllerApplicationException(LOGGER, "Failed to deserialize the instance partitions",
Response.Status.BAD_REQUEST);
}

String inputTenantName = InstancePartitionsType.valueOf(instancePartitionType)
.getInstancePartitionsName(tenantName);

if (!instancePartitions.getInstancePartitionsName().equals(inputTenantName)) {
throw new ControllerApplicationException(LOGGER, "Instance partitions name mismatch, expected: "
+ inputTenantName
+ ", got: " + instancePartitions.getInstancePartitionsName(), Response.Status.BAD_REQUEST);
}

persistInstancePartitionsHelper(instancePartitions);
return instancePartitions;
}

private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
try {
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
instancePartitions);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

private String getTablesServedFromServerTenant(String tenantName) {
Set<String> tables = new HashSet<>();
ObjectNode resourceGetRet = JsonUtils.newObjectNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,20 +1735,30 @@ private void assignInstances(TableConfig tableConfig, boolean override) {
for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType);
boolean isPreConfigurationBasedAssignment =
InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType);
InstancePartitions instancePartitions;
if (!hasPreConfiguredInstancePartitions) {
instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
} else {
String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
if (isPreConfigurationBasedAssignment) {
InstancePartitions preConfiguredInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null,
preConfiguredInstancePartitions);
LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions,
preConfiguredInstancePartitions);
} else {
instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
}
}
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,31 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
assignmentConfig, instanceConfigs, existingInstancePartitions);
assignmentConfig, instanceConfigs, existingInstancePartitions, null);
}

public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can the existing assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) call this function since the remaining setup is the same? just to avoid too much code duplication

List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions);
}

public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName),
instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null);
}

private InstancePartitions getInstancePartitions(String instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions) {
@Nullable InstancePartitions existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);

Expand All @@ -93,7 +105,8 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName,

InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions);
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
preConfiguredInstancePartitions);
InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ private InstancePartitionSelectorFactory() {

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions
InstancePartitions existingInstancePartitions) {
return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, null);
}

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
) {
switch (partitionSelector) {
case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
Expand All @@ -40,6 +47,9 @@ public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.Par
case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
case MIRROR_SERVER_SET_PARTITION_SELECTOR:
return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, preConfiguredInstancePartitions);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
+ Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
Expand Down
Loading