diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java index 7f9fc5e0567..c1ffbf794ad 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java @@ -192,7 +192,7 @@ public List getModels(String projectName) { } // within a project, find models that use the specified table - public List getModelsUsingTable(TableDesc table, String project) throws IOException { + public List getModelsUsingTable(TableDesc table, String project) { try (AutoLock lock = modelMapLock.lockForRead()) { List models = new ArrayList<>(); for (DataModelDesc modelDesc : getModels(project)) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java index 89a505a2560..46fa5ae159c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.rest.service; @@ -27,6 +27,7 @@ import javax.annotation.Nullable; +import com.google.common.base.Preconditions; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.metadata.TableMetadataManager; @@ -34,6 +35,8 @@ import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.ModelDimensionDesc; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -44,6 +47,7 @@ public class TableSchemaUpdateChecker { private final TableMetadataManager metadataManager; private final CubeManager cubeManager; + private final DataModelManager dataModelManager; static class CheckResult { private final boolean valid; @@ -87,9 +91,10 @@ static CheckResult invalidOnIncompatibleSchema(String tableName, List re } } - TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager) { + TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager, DataModelManager dataModelManager) { this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); + this.dataModelManager = checkNotNull(dataModelManager, "dataModelManager is null"); } private List findCubeByTable(final TableDesc table) { @@ -133,8 +138,8 @@ private boolean isColumnCompatible(ColumnDesc column, ColumnDesc newCol) { * check whether all columns used in `cube` has compatible schema in current hive schema denoted by `fieldsMap`. * @param cube cube to check, must use `table` in its model * @param origTable kylin's table metadata - * @param fieldsMap current hive schema of `table` - * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise + * @param newTable current hive schema of `table` + * @return columns in origTable that can't be found in newTable */ private List checkAllColumnsInCube(CubeInstance cube, TableDesc origTable, TableDesc newTable) { Set usedColumns = Sets.newHashSet(); @@ -157,8 +162,8 @@ private List checkAllColumnsInCube(CubeInstance cube, TableDesc origTabl /** * check whether all columns in `table` are still in `fields` and have the same index as before. * - * @param table kylin's table metadata - * @param fields current table metadata in hive + * @param origTable kylin's table metadata + * @param newTable current table metadata in hive * @return true if only new columns are appended in hive, false otherwise */ private boolean checkAllColumnsInTableDesc(TableDesc origTable, TableDesc newTable) { @@ -182,35 +187,138 @@ public CheckResult allowReload(TableDesc newTableDesc, String prj) { if (existing == null) { return CheckResult.validOnFirstLoad(fullTableName); } - List issues = Lists.newArrayList(); + + for (DataModelDesc usedModel : findModelByTable(newTableDesc, prj)){ + checkValidationInModel(newTableDesc, issues, usedModel); + } + for (CubeInstance cube : findCubeByTable(newTableDesc)) { - String modelName = cube.getModel().getName(); - - // if user reloads a fact table used by cube, then all used columns must match current schema - if (cube.getModel().isFactTable(fullTableName)) { - TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); - List violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc); - if (!violateColumns.isEmpty()) { - issues.add(format(Locale.ROOT, "Column %s used in cube[%s] and model[%s], but changed " + "in hive", - violateColumns, cube.getName(), modelName)); + checkValidationInCube(newTableDesc, issues, cube); + } + + if (issues.isEmpty()) { + return CheckResult.validOnCompatibleSchema(fullTableName); + } + return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues); + } + + private Iterable findModelByTable(TableDesc newTableDesc, String prj) { + List usedModels = Lists.newArrayList(); + List modelNames = dataModelManager.getModelsUsingTable(newTableDesc, prj); + modelNames.stream() + .map(mn -> dataModelManager.getDataModelDesc(mn)) + .filter(m -> null != m) + .forEach(m -> usedModels.add(m)); + + return usedModels; + } + + private void checkValidationInCube(TableDesc newTableDesc, List issues, CubeInstance cube) { + final String fullTableName = newTableDesc.getIdentity(); + String modelName = cube.getModel().getName(); + // if user reloads a fact table used by cube, then all used columns must match current schema + if (cube.getModel().isFactTable(fullTableName)) { + TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); + List violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc); + if (!violateColumns.isEmpty()) { + issues.add(format(Locale.ROOT, "Column %s used in cube[%s] and model[%s], but changed " + "in hive", + violateColumns, cube.getName(), modelName)); + } + } + + // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns + // must be the same (except compatible type changes) + if (cube.getModel().isLookupTable(fullTableName)) { + TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); + if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { + issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in cube[%s] and model[%s], but " + + "changed in " + "hive, only append operation are supported on hive table as lookup table", + lookupTable.getIdentity(), cube.getName(), modelName)); + } + } + } + + private void checkValidationInModel(TableDesc newTableDesc, List issues, DataModelDesc usedModel){ + final String fullTableName = newTableDesc.getIdentity(); + // if user reloads a fact table used by model, then all used columns must match current schema + if (usedModel.isFactTable(fullTableName)) { + TableDesc factTable = usedModel.findFirstTable(fullTableName).getTableDesc(); + List violateColumns = checkAllColumnsInFactTable(usedModel, factTable, newTableDesc); + if (!violateColumns.isEmpty()) { + issues.add(format(Locale.ROOT, "Column %s used in model[%s], but changed " + "in hive", + violateColumns, usedModel.getName())); + } + } + + // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns + // must be the same (except compatible type changes) + if (usedModel.isLookupTable(fullTableName)) { + TableDesc lookupTable = usedModel.findFirstTable(fullTableName).getTableDesc(); + if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { + issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in model[%s], but " + + "changed in " + "hive, only append operation are supported on hive table as lookup table", + lookupTable.getIdentity(), usedModel.getName())); + } + } + } + + private List checkAllColumnsInFactTable(DataModelDesc usedModel, TableDesc factTable, TableDesc newTableDesc) { + List violateColumns = Lists.newArrayList(); + + for (ColumnDesc column : findUsedColumnsInFactTable(usedModel, factTable)) { + if (!column.isComputedColumn()) { + ColumnDesc newCol = newTableDesc.findColumnByName(column.getName()); + if (newCol == null || !isColumnCompatible(column, newCol)) { + violateColumns.add(column.getName()); } } + } + return violateColumns; + } - // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns - // must be the same (except compatible type changes) - if (cube.getModel().isLookupTable(fullTableName)) { - TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); - if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { - issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in cube[%s] and model[%s], but " - + "changed in " + "hive", lookupTable.getIdentity(), cube.getName(), modelName)); + // get table name from column full name + private String getTableName(String columnName) { + int lastIndexOfDot = columnName.lastIndexOf('.'); + String tableName = null; + if (lastIndexOfDot >= 0) { + tableName = columnName.substring(0, lastIndexOfDot); + } else { + return null; + } + // maybe contain db name + lastIndexOfDot = tableName.lastIndexOf('.'); + if (lastIndexOfDot >= 0) { + tableName = tableName.substring(lastIndexOfDot + 1); + } + return tableName; + } + + private ColumnDesc mustGetColumnDesc(TableDesc factTable, String columnName) { + ColumnDesc columnDesc = factTable.findColumnByName(columnName); + Preconditions.checkNotNull(columnDesc, + format(Locale.ROOT, "Can't find column %s in current fact table %s.", columnName, factTable.getIdentity())); + return columnDesc; + } + + private Set findUsedColumnsInFactTable(DataModelDesc usedModel, TableDesc factTable) { + Set usedColumns = Sets.newHashSet(); + // column in dimension + for (ModelDimensionDesc dim : usedModel.getDimensions()) { + if (dim.getTable().equalsIgnoreCase(factTable.getName())) { + for (String col : dim.getColumns()) { + usedColumns.add(mustGetColumnDesc(factTable, col)); } } } - if (issues.isEmpty()) { - return CheckResult.validOnCompatibleSchema(fullTableName); + // column in measure + for (String columnInMeasure : usedModel.getMetrics()) { + if (factTable.getName().equalsIgnoreCase(getTableName(columnInMeasure))) { + usedColumns.add(mustGetColumnDesc(factTable, columnInMeasure)); + } } - return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues); + + return usedColumns; } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index e6921938af5..f5c6d2d853f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -146,7 +146,7 @@ private String[] loadTablesToProject(List> allMeta // do schema check TableMetadataManager metaMgr = getTableManager(); CubeManager cubeMgr = getCubeManager(); - TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr); + TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager()); for (Pair pair : allMeta) { TableDesc tableDesc = pair.getFirst(); TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project);