Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate timeColumnName when adding/updating schema/tableConfig #5966

Merged
merged 3 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.swagger.annotations.ApiResponses;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -171,7 +173,8 @@ public SuccessResponse addSchema(
public String validateSchema(FormDataMultiPart multiPart) {
Schema schema = getSchemaFromMultiPart(multiPart);
try {
SchemaUtils.validate(schema);
List<TableConfig> tableConfigs = getTableConfigsForSchema(schema.getSchemaName());
SchemaUtils.validate(schema, tableConfigs);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
"Invalid schema: " + schema.getSchemaName() + ". Reason: " + e.getMessage(), Response.Status.BAD_REQUEST, e);
Expand All @@ -188,7 +191,8 @@ public String validateSchema(FormDataMultiPart multiPart) {
@ApiResponses(value = {@ApiResponse(code = 200, message = "Successfully validated schema"), @ApiResponse(code = 400, message = "Missing or invalid request body"), @ApiResponse(code = 500, message = "Internal error")})
public String validateSchema(Schema schema) {
try {
SchemaUtils.validate(schema);
List<TableConfig> tableConfigs = getTableConfigsForSchema(schema.getSchemaName());
SchemaUtils.validate(schema, tableConfigs);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
"Invalid schema: " + schema.getSchemaName() + ". Reason: " + e.getMessage(), Response.Status.BAD_REQUEST, e);
Expand All @@ -202,25 +206,26 @@ public String validateSchema(Schema schema) {
* @param override set to true to override the existing schema with the same name
*/
private SuccessResponse addSchema(Schema schema, boolean override) {
String schemaName = schema.getSchemaName();
try {
SchemaUtils.validate(schema);
List<TableConfig> tableConfigs = getTableConfigsForSchema(schemaName);
SchemaUtils.validate(schema, tableConfigs);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
"Cannot add invalid schema: " + schema.getSchemaName() + ". Reason: " + e.getMessage(),
Response.Status.BAD_REQUEST, e);
"Cannot add invalid schema: " + schemaName + ". Reason: " + e.getMessage(), Response.Status.BAD_REQUEST, e);
}

try {
_pinotHelixResourceManager.addSchema(schema, override);
// Best effort notification. If controller fails at this point, no notification is given.
LOGGER.info("Notifying metadata event for adding new schema {}", schema.getSchemaName());
LOGGER.info("Notifying metadata event for adding new schema {}", schemaName);
_metadataEventNotifierFactory.create().notifyOnSchemaEvents(schema, SchemaEventType.CREATE);

return new SuccessResponse(schema.getSchemaName() + " successfully added");
return new SuccessResponse(schemaName + " successfully added");
} catch (Exception e) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SCHEMA_UPLOAD_ERROR, 1L);
throw new ControllerApplicationException(LOGGER,
String.format("Failed to add new schema %s.", schema.getSchemaName()), Response.Status.INTERNAL_SERVER_ERROR,
String.format("Failed to add new schema %s.", schemaName), Response.Status.INTERNAL_SERVER_ERROR,
e);
}
}
Expand All @@ -234,7 +239,8 @@ private SuccessResponse addSchema(Schema schema, boolean override) {
*/
private SuccessResponse updateSchema(String schemaName, Schema schema, boolean reload) {
try {
SchemaUtils.validate(schema);
List<TableConfig> tableConfigs = getTableConfigsForSchema(schemaName);
SchemaUtils.validate(schema, tableConfigs);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
"Cannot add invalid schema: " + schemaName + ". Reason: " + e.getMessage(),
Expand Down Expand Up @@ -318,4 +324,19 @@ private void deleteSchemaInternal(String schemaName) {
Response.Status.INTERNAL_SERVER_ERROR);
}
}

private List<TableConfig> getTableConfigsForSchema(@Nullable String schemaName) {
List<TableConfig> tableConfigs = new ArrayList<>();
if (schemaName != null) {
TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(schemaName);
if (offlineTableConfig != null) {
tableConfigs.add(offlineTableConfig);
}
TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(schemaName);
if (realtimeTableConfig != null) {
tableConfigs.add(realtimeTableConfig);
}
}
return tableConfigs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.util.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -58,7 +59,8 @@ public SuccessResponse updateIndexingConfig(
TableConfig tableConfig;
try {
tableConfig = JsonUtils.stringToObject(tableConfigString, TableConfig.class);
TableConfigUtils.validate(tableConfig);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Invalid table config", Response.Status.BAD_REQUEST, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.util.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +56,8 @@ public SuccessResponse updateTableMetadata(@PathParam("tableName") String tableN
TableConfig tableConfig;
try {
tableConfig = JsonUtils.stringToObject(tableConfigString, TableConfig.class);
TableConfigUtils.validate(tableConfig);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Invalid table config", Response.Status.BAD_REQUEST, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,8 +114,9 @@ public SuccessResponse addTable(String tableConfigStr) {
TableConfig tableConfig;
try {
tableConfig = JsonUtils.stringToObject(tableConfigStr, TableConfig.class);
Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
// TableConfigUtils.validate(...) is used across table create/update.
TableConfigUtils.validate(tableConfig);
TableConfigUtils.validate(tableConfig, schema);
// TableConfigUtils.validateTableName(...) checks table name rules.
// So it won't effect already created tables.
TableConfigUtils.validateTableName(tableConfig);
Expand Down Expand Up @@ -325,7 +327,8 @@ public SuccessResponse updateTableConfig(
TableConfig tableConfig;
try {
tableConfig = JsonUtils.stringToObject(tableConfigString, TableConfig.class);
TableConfigUtils.validate(tableConfig);
Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Invalid table config", Response.Status.BAD_REQUEST, e);
}
Expand Down Expand Up @@ -368,7 +371,8 @@ public SuccessResponse updateTableConfig(
public String checkTableConfig(String tableConfigStr) {
try {
TableConfig tableConfig = JsonUtils.stringToObject(tableConfigStr, TableConfig.class);
TableConfigUtils.validate(tableConfig);
Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema);
ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
if (tableConfig.getTableType() == TableType.OFFLINE) {
tableConfigValidateStr.set(TableType.OFFLINE.name(), tableConfig.toJsonNode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.util.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -60,7 +61,8 @@ public SuccessResponse put(@ApiParam(value = "Table name", required = true) @Pat
TableConfig tableConfig;
try {
tableConfig = JsonUtils.stringToObject(tableConfigString, TableConfig.class);
TableConfigUtils.validate(tableConfig);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Invalid table config", Response.Status.BAD_REQUEST, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,22 @@ public Schema getTableSchema(String tableName) {
return ZKMetadataProvider.getTableSchema(_propertyStore, tableName);
}

/**
* Find schema with same name as rawTableName. If not found, find schema using schemaName in validationConfig.
* For OFFLINE table, it is possible that schema was not uploaded before creating the table. Hence for OFFLINE, this method can return null.
*/
@Nullable
public Schema getSchemaForTableConfig(TableConfig tableConfig) {
Schema schema = getSchema(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
if (schema == null) {
String schemaName = tableConfig.getValidationConfig().getSchemaName();
if (schemaName != null) {
schema = getSchema(schemaName);
}
}
return schema;
}

public List<String> getSchemaNames() {
return _propertyStore
.getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
Expand Down Expand Up @@ -411,10 +412,9 @@ protected Schema createDummySchema(String tableName) {
schema.setSchemaName(tableName);
schema.addField(new DimensionFieldSpec("dimA", FieldSpec.DataType.STRING, true, ""));
schema.addField(new DimensionFieldSpec("dimB", FieldSpec.DataType.STRING, true, 0));

schema.addField(new MetricFieldSpec("metricA", FieldSpec.DataType.INT, 0));
schema.addField(new MetricFieldSpec("metricB", FieldSpec.DataType.DOUBLE, -1));

schema.addField(new DateTimeFieldSpec("timeColumn", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:DAYS"));
return schema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;
Expand All @@ -42,6 +43,22 @@ public class SchemaUtils {
public static final String MAP_KEY_COLUMN_SUFFIX = "__KEYS";
public static final String MAP_VALUE_COLUMN_SUFFIX = "__VALUES";

/**
* Validates the schema.
* First checks that the schema is compatible with any provided table configs associated with it.
* This check is useful to ensure schema and table are compatible, in the event that schema is updated or added after the table config
* Then validates the schema using {@link SchemaUtils#validate(Schema schema)}
*
* @param schema schema to validate
* @param tableConfigs table configs associated with this schema (table configs with raw name = schema name)
*/
public static void validate(Schema schema, List<TableConfig> tableConfigs) {
for (TableConfig tableConfig : tableConfigs) {
validateCompatibilityWithTableConfig(schema, tableConfig);
}
validate(schema);
}

/**
* Validates the following:
* 1) Checks valid transform function -
Expand Down Expand Up @@ -89,6 +106,19 @@ public static void validate(Schema schema) {
transformedColumns.retainAll(argumentColumns));
}

/**
* Validates that the schema is compatible with the given table config
*/
private static void validateCompatibilityWithTableConfig(Schema schema, TableConfig tableConfig) {
try {
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
throw new IllegalStateException(
"Schema is incompatible with tableConfig with name: " + tableConfig.getTableName() + " and type: "
+ tableConfig.getTableType(), e);
}
}

/**
* Checks for valid incoming and outgoing granularity spec in the time field spec
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimeUtils;


Expand All @@ -54,10 +55,15 @@ private TableConfigUtils() {
* 1. Validation config
* 2. IngestionConfig
* 3. TierConfigs
*
* TODO: Add more validations for each section (e.g. verify column names used in the indexing, validate conditions are met for aggregateMetrics etc)
*/
public static void validate(TableConfig tableConfig) {
validateValidationConfig(tableConfig);
validateIngestionConfig(tableConfig.getIngestionConfig());
public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
if (tableConfig.getTableType() == TableType.REALTIME) {
Preconditions.checkState(schema != null, "Schema should not be null for REALTIME table");
}
validateValidationConfig(tableConfig, schema);
validateIngestionConfig(tableConfig.getIngestionConfig(), schema);
validateTierConfigList(tableConfig.getTierConfigsList());
}

Expand All @@ -74,19 +80,37 @@ public static void validateTableName(TableConfig tableConfig) {
}
}

private static void validateValidationConfig(TableConfig tableConfig) {
/**
* Validates the following in the validationConfig of the table
* 1. For REALTIME table
* - checks for non-null timeColumnName
* - checks for valid field spec for timeColumnName in schema
*
* 2. For OFFLINE table
* - checks for valid field spec for timeColumnName in schema, if timeColumnName and schema re non-null
*
* 3. Checks peerDownloadSchema
*/
private static void validateValidationConfig(TableConfig tableConfig, @Nullable Schema schema) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
if (validationConfig != null) {
if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) {
throw new IllegalStateException("Must provide time column in real-time table config");
}
String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
if (peerSegmentDownloadScheme != null) {
if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)
&& !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme
+ "' for peerSegmentDownloadScheme. Must be one of http nor https");
}
String timeColumnName = validationConfig.getTimeColumnName();
if (tableConfig.getTableType() == TableType.REALTIME) {
// For REALTIME table, must have a non-null timeColumnName
Preconditions.checkState(timeColumnName != null, "'timeColumnName' cannot be null in REALTIME table config");
}
// timeColumnName can be null in OFFLINE table
if (timeColumnName != null && schema != null) {
Preconditions.checkState(schema.getSpecForTimeColumn(timeColumnName) != null,
"Cannot find valid fieldSpec for timeColumn: %s from the table config, in the schema: %s", timeColumnName,
schema.getSchemaName());
}

String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
if (peerSegmentDownloadScheme != null) {
if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL
.equalsIgnoreCase(peerSegmentDownloadScheme)) {
throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme
+ "' for peerSegmentDownloadScheme. Must be one of http or https");
}
}
}
Expand All @@ -99,8 +123,10 @@ private static void validateValidationConfig(TableConfig tableConfig) {
* 4. validity of transform function string
* 5. checks for source fields used in destination columns
*/
private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) {
private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig, @Nullable Schema schema) {
if (ingestionConfig != null) {

// Filter config
FilterConfig filterConfig = ingestionConfig.getFilterConfig();
if (filterConfig != null) {
String filterFunction = filterConfig.getFilterFunction();
Expand All @@ -112,12 +138,18 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC
}
}
}

// Transform configs
List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
if (transformConfigs != null) {
Set<String> transformColumns = new HashSet<>();
Set<String> argumentColumns = new HashSet<>();
for (TransformConfig transformConfig : transformConfigs) {
String columnName = transformConfig.getColumnName();
if (schema != null) {
Preconditions.checkState(schema.getFieldSpecFor(columnName) != null,
"The destination column of the transform function must be present in the schema");
}
String transformFunction = transformConfig.getTransformFunction();
if (columnName == null || transformFunction == null) {
throw new IllegalStateException(
Expand Down
Loading