Skip to content

Commit

Permalink
[fix](dynamic partition) partition create failed after alter distribu…
Browse files Browse the repository at this point in the history
…ted column (apache#20239)

This pr fix following two problems:

Problem1: Alter column comment make add dynamic partition failed inside issue apache#10811

create table with dynamic partition policy;
restart FE;
alter distribution column comment;
alter dynamic_partition.end to trigger add new partition by dynamic partition scheduler;
Then we got the error log, and the new partition create failed.
dynamic add partition failed: errCode = 2, detailMessage =      Cannot assign hash distribution with different distribution cols. default is: [id int(11) NULL COMMENT 'new_comment_of_id'], db: default_cluster:example_db, table: test_2
Problem2: rename distributed column, make old partition insert failed. inside apache#20405

The key point of the reproduce steps is restart FE.

It seems all versions will be affected, include master and lts-1.1 and so on.
  • Loading branch information
camby authored Jun 5, 2023
1 parent d392791 commit 3c28a71
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlExcep
+ column.getName() + "].");
}

distributionColumns.add(column);
// distribution info and base columns persist seperately inside OlapTable, so we need deep copy
// to avoid modify table columns also modify columns inside distribution info.
distributionColumns.add(new Column(column));
find = true;
break;
}
Expand Down
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,28 @@ && getScale() == other.getScale()
&& Objects.equals(realDefaultValue, other.realDefaultValue);
}

// distribution column compare only care about attrs which affect data,
// do not care about attrs, such as comment
public boolean equalsForDistribution(Column other) {
if (other == this) {
return true;
}

return name.equalsIgnoreCase(other.name)
&& Objects.equals(getDefaultValue(), other.getDefaultValue())
&& Objects.equals(aggregationType, other.aggregationType)
&& isAggregationTypeImplicit == other.isAggregationTypeImplicit
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& visible == other.visible
&& Objects.equals(children, other.children)
&& Objects.equals(realDefaultValue, other.realDefaultValue);
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Expand Down
25 changes: 18 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4290,12 +4290,26 @@ private void renameColumn(Database db, OlapTable table, String colName,
// 4. modify distribution info
DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
if (distributionInfo.getType() == DistributionInfoType.HASH) {
// modify default distribution info
List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
for (Column column : distributionColumns) {
if (column.getName().equalsIgnoreCase(colName)) {
column.setName(newColName);
}
}
// modify distribution info inside partitions
for (Partition p : table.getPartitions()) {
DistributionInfo partDistInfo = p.getDistributionInfo();
if (partDistInfo.getType() != DistributionInfoType.HASH) {
continue;
}
List<Column> partDistColumns = ((HashDistributionInfo) partDistInfo).getDistributionColumns();
for (Column column : partDistColumns) {
if (column.getName().equalsIgnoreCase(colName)) {
column.setName(newColName);
}
}
}
}

// 5. modify sequence map col
Expand Down Expand Up @@ -4546,13 +4560,10 @@ public void modifyDefaultDistributionBucketNum(Database db, OlapTable olapTable,
}
if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
List<Column> defaultDistriCols
= ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
if (!newDistriCols.equals(defaultDistriCols)) {
throw new DdlException(
"Cannot assign hash distribution with different distribution cols. " + "default is: "
+ defaultDistriCols);
if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+ "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+ ((HashDistributionInfo) distributionInfo).getDistributionColumns());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ public static DistributionInfo read(DataInput in) throws IOException {
return distributionInfo;
}

public boolean sameDistributionColumns(HashDistributionInfo other) {
if (distributionColumns.size() != other.distributionColumns.size()) {
return false;
}
for (int i = 0; i < distributionColumns.size(); ++i) {
if (!distributionColumns.get(i).equalsForDistribution(other.distributionColumns.get(i))) {
return false;
}
}
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -107,7 +119,7 @@ public boolean equals(Object o) {
return false;
}
HashDistributionInfo that = (HashDistributionInfo) o;
return bucketNum == that.bucketNum && Objects.equals(distributionColumns, that.distributionColumns);
return bucketNum == that.bucketNum && sameDistributionColumns(that);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,17 +1410,14 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa

if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
.getDistributionColumns();
if (!newDistriCols.equals(defaultDistriCols)) {
throw new DdlException(
"Cannot assign hash distribution with different distribution cols. " + "default is: "
+ defaultDistriCols);
}
if (hashDistributionInfo.getBucketNum() <= 0) {
throw new DdlException("Cannot assign hash distribution buckets less than 1");
}
if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+ "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+ ((HashDistributionInfo) distributionInfo).getDistributionColumns());
}
} else if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
RandomDistributionInfo randomDistributionInfo = (RandomDistributionInfo) distributionInfo;
if (randomDistributionInfo.getBucketNum() <= 0) {
Expand Down
2 changes: 2 additions & 0 deletions regression-test/pipeline/p0/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ enable_struct_type=true

# enable mtmv
enable_mtmv = true

dynamic_partition_check_interval_seconds=5
2 changes: 2 additions & 0 deletions regression-test/pipeline/p1/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ enable_mtmv = true
# enable auto collect statistics
enable_auto_collect_statistics=true
auto_check_statistics_in_sec=60

dynamic_partition_check_interval_seconds=5
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_dynamic_partition_with_alter") {
def tbl = "test_dynamic_partition_with_alter"
sql "drop table if exists ${tbl}"
sql """
CREATE TABLE IF NOT EXISTS ${tbl}
( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
AGGREGATE KEY(k1,k2)
PARTITION BY RANGE(k1) ( )
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"dynamic_partition.enable"="true",
"dynamic_partition.end"="3",
"dynamic_partition.buckets"="1",
"dynamic_partition.start"="-3",
"dynamic_partition.prefix"="p",
"dynamic_partition.time_unit"="DAY",
"dynamic_partition.create_history_partition"="true",
"dynamic_partition.replication_allocation" = "tag.location.default: 1")
"""
result = sql "show partitions from ${tbl}"
assertEquals(7, result.size())

// modify distributed column comment, then try to add too more dynamic partition
sql """ alter table ${tbl} modify column k1 comment 'new_comment_for_k1' """
sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
sql """ alter table ${tbl} set('dynamic_partition.end'='5') """
result = sql "show partitions from ${tbl}"
for (def retry = 0; retry < 15; retry++) {
if (result.size() == 9) {
break;
}
logger.info("wait dynamic partition scheduler, sleep 1s")
sleep(1000);
result = sql "show partitions from ${tbl}"
}
assertEquals(9, result.size())

sql "drop table ${tbl}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_dynamic_partition_with_rename") {
def tbl = "test_dynamic_partition_with_rename"
sql "drop table if exists ${tbl}"
sql """
CREATE TABLE IF NOT EXISTS ${tbl}
( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
AGGREGATE KEY(k1,k2)
PARTITION BY RANGE(k1) ( )
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"dynamic_partition.enable"="true",
"dynamic_partition.end"="3",
"dynamic_partition.buckets"="1",
"dynamic_partition.start"="-3",
"dynamic_partition.prefix"="p",
"dynamic_partition.time_unit"="DAY",
"dynamic_partition.create_history_partition"="true",
"dynamic_partition.replication_allocation" = "tag.location.default: 1")
"""
result = sql "show partitions from ${tbl}"
assertEquals(7, result.size())

// rename distributed column, then try to add too more dynamic partition
sql "alter table ${tbl} rename column k1 renamed_k1"
sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
sql """ alter table ${tbl} set('dynamic_partition.end'='5') """
result = sql "show partitions from ${tbl}"
for (def retry = 0; retry < 15; retry++) {
if (result.size() == 9) {
break;
}
logger.info("wait dynamic partition scheduler, sleep 1s")
sleep(1000);
result = sql "show partitions from ${tbl}"
}
assertEquals(9, result.size())
for (def line = 0; line < result.size(); line++) {
// XXX: DistributionKey at pos(7), next maybe impl by sql meta
assertEquals("renamed_k1", result.get(line).get(7))
}

sql "drop table ${tbl}"
}

0 comments on commit 3c28a71

Please sign in to comment.