Skip to content

Commit

Permalink
[Enhancement] Reduce lock time of schema change in scenarios with a l…
Browse files Browse the repository at this point in the history
…arge number of columns (#52800)

Signed-off-by: trueeyu <lxhhust350@qq.com>
(cherry picked from commit f004276)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java
#	fe/fe-core/src/test/java/com/starrocks/alter/SchemaChangeHandlerTest.java
  • Loading branch information
trueeyu authored and mergify[bot] committed Nov 12, 2024
1 parent c4e730c commit 0c22bf4
Show file tree
Hide file tree
Showing 2 changed files with 466 additions and 0 deletions.
110 changes: 110 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.starrocks.catalog.Type;
import com.starrocks.catalog.UniqueConstraint;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.CaseSensibility;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
Expand Down Expand Up @@ -1024,10 +1025,23 @@ private AlterJobV2 createJob(long dbId, OlapTable olapTable, Map<Long, LinkedLis
}

// 2. check compatible
Map<String, Column> originSchemaMap = buildSchemaMapFromList(originSchema, true,
CaseSensibility.COLUMN.getCaseSensibility());

for (Column alterColumn : alterSchema) {
<<<<<<< HEAD
Optional<Column> col = originSchema.stream().filter(c -> c.nameEquals(alterColumn.getName(), true)).findFirst();
if (col.isPresent() && !alterColumn.equals(col.get())) {
col.get().checkSchemaChangeAllowed(alterColumn);
=======
if (modifyFieldColumns.contains(alterColumn.getName())) {
continue;
}
Column col = getColumnFromSchemaMap(originSchemaMap, alterColumn.getName(), true,
CaseSensibility.COLUMN.getCaseSensibility());
if (col != null && !alterColumn.equals(col)) {
col.checkSchemaChangeAllowed(alterColumn);
>>>>>>> f004276c81 ([Enhancement] Reduce lock time of schema change in scenarios with a large number of columns (#52800))
}
}

Expand Down Expand Up @@ -1111,9 +1125,105 @@ private AlterJobV2 createJob(long dbId, OlapTable olapTable, Map<Long, LinkedLis
return jobBuilder.build();
}

<<<<<<< HEAD
private AlterJobV2 createJobForProcessReorderColumnOfPrimaryKey(long dbId, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap,
List<Integer> sortKeyIdxes) throws UserException {
=======
protected static Map<String, Column> buildSchemaMapFromList(List<Column> schema, boolean ignorePrefix,
boolean caseSensibility) {
Map<String, Column> schemaMap = Maps.newHashMap();
if (ignorePrefix) {
if (caseSensibility) {
schema.forEach(col -> schemaMap.put(Column.removeNamePrefix(col.getName()), col));
} else {
schema.forEach(col -> schemaMap.put(Column.removeNamePrefix(col.getName()).toLowerCase(), col));
}
} else {
if (caseSensibility) {
schema.forEach(col -> schemaMap.put(col.getName(), col));
} else {
schema.forEach(col -> schemaMap.put(col.getName().toLowerCase(), col));
}
}
return schemaMap;
}

protected static Column getColumnFromSchemaMap(Map<String, Column> originSchemaMap, String col,
boolean ignorePrefix, boolean caseSensibility) {
if (caseSensibility) {
if (ignorePrefix) {
return originSchemaMap.get(Column.removeNamePrefix(col));
} else {
return originSchemaMap.get(col);
}
} else {
if (ignorePrefix) {
return originSchemaMap.get(Column.removeNamePrefix(col).toLowerCase());
} else {
return originSchemaMap.get(col.toLowerCase());
}
}
}

private void checkPartitionColumnChange(OlapTable olapTable, List<Column> alterSchema, long alterIndexId)
throws DdlException {
// Since partition key and distribution key's schema change can only happen in base index,
// only check it in base index.There may be some trivial changes between base index and other
// indices(eg: AggregateType), so check it in base index.
if (alterIndexId != olapTable.getBaseIndexId()) {
return;
}

List<Column> partitionColumns = olapTable.getPartitionInfo().getPartitionColumns(olapTable.getIdToColumn());
for (Column partitionCol : partitionColumns) {
String colName = partitionCol.getName();
Optional<Column> col = alterSchema.stream().filter(c -> c.nameEquals(colName, true)).findFirst();
if (col.isPresent() && !col.get().equals(partitionCol)) {
throw new DdlException("Can not modify partition column[" + colName + "]. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
if (col.isEmpty() && alterIndexId == olapTable.getBaseIndexId()) {
// 2.1 partition column cannot be deleted.
throw new DdlException("Partition column[" + partitionCol.getName()
+ "] cannot be dropped. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
// ATTN. partition columns' order also need remaining unchanged.
// for now, we only allow one partition column, so no need to check order.
}
} // end for partitionColumns
}

private void checkDistributionColumnChange(OlapTable olapTable, List<Column> alterSchema, long alterIndexId)
throws DdlException {
// Since partition key and distribution key's schema change can only happen in base index,
// only check it in base index.There may be some trivial changes between base index and other
// indices(eg: AggregateType), so check it in base index.
if (alterIndexId != olapTable.getBaseIndexId()) {
return;
}

List<Column> distributionColumns = MetaUtils.getColumnsByColumnIds(
olapTable, olapTable.getDefaultDistributionInfo().getDistributionColumns());
for (Column distributionCol : distributionColumns) {
String colName = distributionCol.getName();
Optional<Column> col = alterSchema.stream().filter(c -> c.nameEquals(colName, true)).findFirst();
if (col.isPresent() && !col.get().equals(distributionCol)) {
throw new DdlException("Can not modify distribution column[" + colName + "]. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
if (col.isEmpty() && alterIndexId == olapTable.getBaseIndexId()) {
// 2.2 distribution column cannot be deleted.
throw new DdlException("Distribution column[" + distributionCol.getName()
+ "] cannot be dropped. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
}
} // end for distributionCols
}

private AlterJobV2 createJobForProcessModifySortKeyColumn(long dbId, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap,
List<Integer> sortKeyIdxes,
List<Integer> sortKeyUniqueIds) throws UserException {
>>>>>>> f004276c81 ([Enhancement] Reduce lock time of schema change in scenarios with a large number of columns (#52800))
if (olapTable.getState() == OlapTableState.ROLLUP) {
throw new DdlException("Table[" + olapTable.getName() + "]'s is doing ROLLUP job");
}
Expand Down
Loading

0 comments on commit 0c22bf4

Please sign in to comment.