Skip to content

Commit

Permalink
KYLIN-3835 [Defective TableSchemaUpdateChecker] Don't check used mode…
Browse files Browse the repository at this point in the history
…ls when reload table
  • Loading branch information
Qsimple authored and shaofengshi committed Mar 4, 2019
1 parent 636f93a commit 7519629
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public List<DataModelDesc> getModels(String projectName) {
}

// within a project, find models that use the specified table
public List<String> getModelsUsingTable(TableDesc table, String project) throws IOException {
public List<String> getModelsUsingTable(TableDesc table, String project) {
try (AutoLock lock = modelMapLock.lockForRead()) {
List<String> models = new ArrayList<>();
for (DataModelDesc modelDesc : getModels(project)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/

package org.apache.kylin.rest.service;

Expand All @@ -27,13 +27,16 @@

import javax.annotation.Nullable;

import com.google.common.base.Preconditions;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.model.ModelDimensionDesc;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
Expand All @@ -44,6 +47,7 @@
public class TableSchemaUpdateChecker {
private final TableMetadataManager metadataManager;
private final CubeManager cubeManager;
private final DataModelManager dataModelManager;

static class CheckResult {
private final boolean valid;
Expand Down Expand Up @@ -87,9 +91,10 @@ static CheckResult invalidOnIncompatibleSchema(String tableName, List<String> re
}
}

TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager) {
TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager, DataModelManager dataModelManager) {
this.metadataManager = checkNotNull(metadataManager, "metadataManager is null");
this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
this.dataModelManager = checkNotNull(dataModelManager, "dataModelManager is null");
}

private List<CubeInstance> findCubeByTable(final TableDesc table) {
Expand Down Expand Up @@ -133,8 +138,8 @@ private boolean isColumnCompatible(ColumnDesc column, ColumnDesc newCol) {
* check whether all columns used in `cube` has compatible schema in current hive schema denoted by `fieldsMap`.
* @param cube cube to check, must use `table` in its model
* @param origTable kylin's table metadata
* @param fieldsMap current hive schema of `table`
* @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise
* @param newTable current hive schema of `table`
* @return columns in origTable that can't be found in newTable
*/
private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc origTable, TableDesc newTable) {
Set<ColumnDesc> usedColumns = Sets.newHashSet();
Expand All @@ -157,8 +162,8 @@ private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc origTabl
/**
* check whether all columns in `table` are still in `fields` and have the same index as before.
*
* @param table kylin's table metadata
* @param fields current table metadata in hive
* @param origTable kylin's table metadata
* @param newTable current table metadata in hive
* @return true if only new columns are appended in hive, false otherwise
*/
private boolean checkAllColumnsInTableDesc(TableDesc origTable, TableDesc newTable) {
Expand All @@ -182,35 +187,138 @@ public CheckResult allowReload(TableDesc newTableDesc, String prj) {
if (existing == null) {
return CheckResult.validOnFirstLoad(fullTableName);
}

List<String> issues = Lists.newArrayList();

for (DataModelDesc usedModel : findModelByTable(newTableDesc, prj)){
checkValidationInModel(newTableDesc, issues, usedModel);
}

for (CubeInstance cube : findCubeByTable(newTableDesc)) {
String modelName = cube.getModel().getName();

// if user reloads a fact table used by cube, then all used columns must match current schema
if (cube.getModel().isFactTable(fullTableName)) {
TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc();
List<String> violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc);
if (!violateColumns.isEmpty()) {
issues.add(format(Locale.ROOT, "Column %s used in cube[%s] and model[%s], but changed " + "in hive",
violateColumns, cube.getName(), modelName));
checkValidationInCube(newTableDesc, issues, cube);
}

if (issues.isEmpty()) {
return CheckResult.validOnCompatibleSchema(fullTableName);
}
return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues);
}

private Iterable<? extends DataModelDesc> findModelByTable(TableDesc newTableDesc, String prj) {
List<DataModelDesc> usedModels = Lists.newArrayList();
List<String> modelNames = dataModelManager.getModelsUsingTable(newTableDesc, prj);
modelNames.stream()
.map(mn -> dataModelManager.getDataModelDesc(mn))
.filter(m -> null != m)
.forEach(m -> usedModels.add(m));

return usedModels;
}

private void checkValidationInCube(TableDesc newTableDesc, List<String> issues, CubeInstance cube) {
final String fullTableName = newTableDesc.getIdentity();
String modelName = cube.getModel().getName();
// if user reloads a fact table used by cube, then all used columns must match current schema
if (cube.getModel().isFactTable(fullTableName)) {
TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc();
List<String> violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc);
if (!violateColumns.isEmpty()) {
issues.add(format(Locale.ROOT, "Column %s used in cube[%s] and model[%s], but changed " + "in hive",
violateColumns, cube.getName(), modelName));
}
}

// if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns
// must be the same (except compatible type changes)
if (cube.getModel().isLookupTable(fullTableName)) {
TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc();
if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) {
issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in cube[%s] and model[%s], but "
+ "changed in " + "hive, only append operation are supported on hive table as lookup table",
lookupTable.getIdentity(), cube.getName(), modelName));
}
}
}

private void checkValidationInModel(TableDesc newTableDesc, List<String> issues, DataModelDesc usedModel){
final String fullTableName = newTableDesc.getIdentity();
// if user reloads a fact table used by model, then all used columns must match current schema
if (usedModel.isFactTable(fullTableName)) {
TableDesc factTable = usedModel.findFirstTable(fullTableName).getTableDesc();
List<String> violateColumns = checkAllColumnsInFactTable(usedModel, factTable, newTableDesc);
if (!violateColumns.isEmpty()) {
issues.add(format(Locale.ROOT, "Column %s used in model[%s], but changed " + "in hive",
violateColumns, usedModel.getName()));
}
}

// if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns
// must be the same (except compatible type changes)
if (usedModel.isLookupTable(fullTableName)) {
TableDesc lookupTable = usedModel.findFirstTable(fullTableName).getTableDesc();
if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) {
issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in model[%s], but "
+ "changed in " + "hive, only append operation are supported on hive table as lookup table",
lookupTable.getIdentity(), usedModel.getName()));
}
}
}

private List<String> checkAllColumnsInFactTable(DataModelDesc usedModel, TableDesc factTable, TableDesc newTableDesc) {
List<String> violateColumns = Lists.newArrayList();

for (ColumnDesc column : findUsedColumnsInFactTable(usedModel, factTable)) {
if (!column.isComputedColumn()) {
ColumnDesc newCol = newTableDesc.findColumnByName(column.getName());
if (newCol == null || !isColumnCompatible(column, newCol)) {
violateColumns.add(column.getName());
}
}
}
return violateColumns;
}

// if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns
// must be the same (except compatible type changes)
if (cube.getModel().isLookupTable(fullTableName)) {
TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc();
if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) {
issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in cube[%s] and model[%s], but "
+ "changed in " + "hive", lookupTable.getIdentity(), cube.getName(), modelName));
// get table name from column full name
private String getTableName(String columnName) {
int lastIndexOfDot = columnName.lastIndexOf('.');
String tableName = null;
if (lastIndexOfDot >= 0) {
tableName = columnName.substring(0, lastIndexOfDot);
} else {
return null;
}
// maybe contain db name
lastIndexOfDot = tableName.lastIndexOf('.');
if (lastIndexOfDot >= 0) {
tableName = tableName.substring(lastIndexOfDot + 1);
}
return tableName;
}

private ColumnDesc mustGetColumnDesc(TableDesc factTable, String columnName) {
ColumnDesc columnDesc = factTable.findColumnByName(columnName);
Preconditions.checkNotNull(columnDesc,
format(Locale.ROOT, "Can't find column %s in current fact table %s.", columnName, factTable.getIdentity()));
return columnDesc;
}

private Set<ColumnDesc> findUsedColumnsInFactTable(DataModelDesc usedModel, TableDesc factTable) {
Set<ColumnDesc> usedColumns = Sets.newHashSet();
// column in dimension
for (ModelDimensionDesc dim : usedModel.getDimensions()) {
if (dim.getTable().equalsIgnoreCase(factTable.getName())) {
for (String col : dim.getColumns()) {
usedColumns.add(mustGetColumnDesc(factTable, col));
}
}
}

if (issues.isEmpty()) {
return CheckResult.validOnCompatibleSchema(fullTableName);
// column in measure
for (String columnInMeasure : usedModel.getMetrics()) {
if (factTable.getName().equalsIgnoreCase(getTableName(columnInMeasure))) {
usedColumns.add(mustGetColumnDesc(factTable, columnInMeasure));
}
}
return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues);

return usedColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private String[] loadTablesToProject(List<Pair<TableDesc, TableExtDesc>> allMeta
// do schema check
TableMetadataManager metaMgr = getTableManager();
CubeManager cubeMgr = getCubeManager();
TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr);
TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager());
for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
TableDesc tableDesc = pair.getFirst();
TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project);
Expand Down

0 comments on commit 7519629

Please sign in to comment.