From ecd3fd07f6497cb39334f1ed103efc29dae2a326 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 3 Apr 2023 14:03:42 +0800 Subject: [PATCH] [feature](colocate) support cross database colocate join (#18152) --- .../join-optimization/colocation-join.md | 19 ++ .../join-optimization/colocation-join.md | 21 +- .../apache/doris/common/FeMetaVersion.java | 4 +- .../doris/catalog/ColocateGroupSchema.java | 7 +- .../doris/catalog/ColocateTableIndex.java | 74 +++++- .../java/org/apache/doris/catalog/Env.java | 27 ++- .../ColocateTableCheckerAndBalancer.java | 25 +- .../doris/common/util/PropertyAnalyzer.java | 1 + .../doris/datasource/InternalCatalog.java | 8 +- .../apache/doris/journal/JournalEntity.java | 3 +- .../doris/persist/ColocatePersistInfo.java | 8 - .../doris/persist/TablePropertyInfo.java | 46 ++-- .../doris/planner/DistributedPlanner.java | 2 +- .../doris/catalog/ColocateTableTest.java | 2 +- .../doris/planner/ColocatePlanTest.java | 217 +++++++++++++----- .../doris/utframe/TestWithFeService.java | 6 + .../correctness_p0/test_colocate_join.out | 22 ++ .../correctness_p0/test_colocate_join.groovy | 111 +++++++++ 18 files changed, 476 insertions(+), 127 deletions(-) create mode 100644 regression-test/data/correctness_p0/test_colocate_join.out diff --git a/docs/en/docs/advanced/join-optimization/colocation-join.md b/docs/en/docs/advanced/join-optimization/colocation-join.md index fd67ee5a8a13fb..15357c291366f0 100644 --- a/docs/en/docs/advanced/join-optimization/colocation-join.md +++ b/docs/en/docs/advanced/join-optimization/colocation-join.md @@ -93,6 +93,25 @@ PROPERTIES( If the specified group does not exist, Doris automatically creates a group that contains only the current table. If the Group already exists, Doris checks whether the current table satisfies the Colocation Group Schema. If satisfied, the table is created and added to the Group. At the same time, tables create fragments and replicas based on existing data distribution rules in Groups. Group belongs to a database, and its name is unique in a database. Internal storage is the full name of Group `dbId_groupName`, but users only perceive groupName. + + +In version 2.0, Doris supports cross-Database Group. When creating a table, you need to use the keyword `__global__` as a prefix of the Group name. like: + +``` +CREATE TABLE tbl (k1 int, v1 int sum) +DISTRIBUTED BY HASH(k1) +BUCKETS 8 +PROPERTIES( + "colocate_with" = "__global__group1" +); +``` + +The Group prefixed with `__global__` no longer belongs to a Database, and its name is also globally unique. + +Cross-Database Colocate Join can be realized by creating a Global Group. + + + ### Delete table When the last table in Group is deleted completely (deleting completely means deleting from the recycle bin). Usually, when a table is deleted by the `DROP TABLE` command, it will be deleted after the default one-day stay in the recycle bin, and the group will be deleted automatically. diff --git a/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md b/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md index c37eb7bbecf360..e357683dc8b6eb 100644 --- a/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md +++ b/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md @@ -90,6 +90,25 @@ PROPERTIES( 如果指定的 Group 不存在,则 Doris 会自动创建一个只包含当前这张表的 Group。如果 Group 已存在,则 Doris 会检查当前表是否满足 Colocation Group Schema。如果满足,则会创建该表,并将该表加入 Group。同时,表会根据已存在的 Group 中的数据分布规则创建分片和副本。 Group 归属于一个 Database,Group 的名字在一个 Database 内唯一。在内部存储是 Group 的全名为 `dbId_groupName`,但用户只感知 groupName。 + + +2.0 版本中,Doris 支持了跨Database的 Group。在建表时,需使用关键词 `__global__` 作为 Group 名称的前缀。如: + +``` +CREATE TABLE tbl (k1 int, v1 int sum) +DISTRIBUTED BY HASH(k1) +BUCKETS 8 +PROPERTIES( + "colocate_with" = "__global__group1" +); +``` + +`__global__` 前缀的 Group 不再归属于一个 Database,其名称也是全局唯一的。 + +通过创建 Global Group,可以实现跨 Database 的 Colocate Join。 + + + ### 删表 当 Group 中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过 `DROP TABLE` 命令删除后,会在回收站默认停留一天的时间后,再删除),该 Group 也会被自动删除。 @@ -408,4 +427,4 @@ Doris 提供了几个和 Colocation Join 有关的 HTTP Restful API,用于查 其中 Body 是以嵌套数组表示的 BucketsSequence 以及每个 Bucket 中分片分布所在 BE 的 id。 - 注意,使用该命令,可能需要将 FE 的配置 `disable_colocate_relocate` 和 `disable_colocate_balance` 设为 true。即关闭系统自动的 Colocation 副本修复和均衡。否则可能在修改后,会被系统自动重置。 \ No newline at end of file + 注意,使用该命令,可能需要将 FE 的配置 `disable_colocate_relocate` 和 `disable_colocate_balance` 设为 true。即关闭系统自动的 Colocation 副本修复和均衡。否则可能在修改后,会被系统自动重置。 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 6f6b6740b4b690..e220b64fb0cc58 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -56,9 +56,11 @@ public final class FeMetaVersion { public static final int VERSION_117 = 117; // change frontend meta to json, add hostname to MasterInfo public static final int VERSION_118 = 118; + // TablePropertyInfo add db id + public static final int VERSION_119 = 119; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_118; + public static final int VERSION_CURRENT = VERSION_119; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java index ff1c957cd4edd9..09e9437a35dc6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java @@ -90,6 +90,11 @@ public void checkDistribution(DistributionInfo distributionInfo) throws DdlExcep // distribution col type for (int i = 0; i < distributionColTypes.size(); i++) { Type targetColType = distributionColTypes.get(i); + // varchar and string has same distribution hash value if it's data is same + if (targetColType.isVarcharOrStringType() && info.getDistributionColumns().get(i).getType() + .isVarcharOrStringType()) { + continue; + } if (!targetColType.equals(info.getDistributionColumns().get(i).getType())) { ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_TYPE, info.getDistributionColumns().get(i).getName(), targetColType); @@ -98,7 +103,7 @@ public void checkDistribution(DistributionInfo distributionInfo) throws DdlExcep } } - public void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException { + private void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException { for (ReplicaAllocation replicaAlloc : partitionInfo.idToReplicaAllocation.values()) { if (!replicaAlloc.equals(this.replicaAlloc)) { ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_REPLICATION_ALLOCATION, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 80144d3d61eb76..e0e327c2c3345d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -22,6 +22,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.ColocatePersistInfo; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; @@ -52,16 +53,23 @@ import java.util.stream.Collectors; /** - * maintain the colocate table related indexes and meta + * maintain the colocation table related indexes and meta */ public class ColocateTableIndex implements Writable { private static final Logger LOG = LogManager.getLogger(ColocateTableIndex.class); - public static class GroupId implements Writable { + public static class GroupId implements Writable, GsonPostProcessable { + public static final String GLOBAL_COLOCATE_PREFIX = "__global__"; + @SerializedName(value = "dbId") public Long dbId; @SerializedName(value = "grpId") public Long grpId; + // only available when dbId = 0 + // because for global colocate table, the dbId is 0, so we do not know which db the table belongs to, + // so we use tblId2DbId to record the dbId of each table + @SerializedName(value = "tblId2DbId") + private Map tblId2DbId = Maps.newHashMap(); private GroupId() { } @@ -71,6 +79,23 @@ public GroupId(long dbId, long grpId) { this.grpId = grpId; } + public void addTblId2DbId(long tblId, long dbId) { + Preconditions.checkState(this.dbId == 0); + tblId2DbId.put(tblId, dbId); + } + + public void removeTblId2DbId(long tblId) { + tblId2DbId.remove(tblId); + } + + public long getDbIdByTblId(long tblId) { + return tblId2DbId.get(tblId); + } + + public int getTblId2DbIdSize() { + return tblId2DbId.size(); + } + public static GroupId read(DataInput in) throws IOException { if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { GroupId groupId = new GroupId(); @@ -102,6 +127,13 @@ public boolean equals(Object obj) { return dbId.equals(other.dbId) && grpId.equals(other.grpId); } + @Override + public void gsonPostProcess() throws IOException { + if (tblId2DbId == null) { + tblId2DbId = Maps.newHashMap(); + } + } + @Override public int hashCode() { int result = 17; @@ -114,6 +146,18 @@ public int hashCode() { public String toString() { return dbId + "." + grpId; } + + public static String getFullGroupName(long dbId, String colocateGroup) { + if (colocateGroup.startsWith(GLOBAL_COLOCATE_PREFIX)) { + return colocateGroup; + } else { + return dbId + "_" + colocateGroup; + } + } + + public static boolean isGlobalGroupName(String groupName) { + return groupName.startsWith(GLOBAL_COLOCATE_PREFIX); + } } // group_name -> group_id @@ -155,11 +199,10 @@ private void writeUnlock() { // NOTICE: call 'addTableToGroup()' will not modify 'group2BackendsPerBucketSeq' // 'group2BackendsPerBucketSeq' need to be set manually before or after, if necessary. - public GroupId addTableToGroup(long dbId, OlapTable tbl, String groupName, GroupId assignedGroupId) { + public GroupId addTableToGroup(long dbId, OlapTable tbl, String fullGroupName, GroupId assignedGroupId) { writeLock(); try { GroupId groupId = null; - String fullGroupName = dbId + "_" + groupName; if (groupName2Id.containsKey(fullGroupName)) { groupId = groupName2Id.get(fullGroupName); } else { @@ -168,7 +211,11 @@ public GroupId addTableToGroup(long dbId, OlapTable tbl, String groupName, Group groupId = assignedGroupId; } else { // generate a new one - groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId()); + if (GroupId.isGlobalGroupName(fullGroupName)) { + groupId = new GroupId(0, Env.getCurrentEnv().getNextId()); + } else { + groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId()); + } } HashDistributionInfo distributionInfo = (HashDistributionInfo) tbl.getDefaultDistributionInfo(); ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId, @@ -178,6 +225,10 @@ public GroupId addTableToGroup(long dbId, OlapTable tbl, String groupName, Group group2Schema.put(groupId, groupSchema); group2ErrMsgs.put(groupId, ""); } + // for global colocate table, dbId is 0, and we need to save the real dbId of the table + if (groupId.dbId == 0) { + groupId.addTblId2DbId(tbl.getId(), dbId); + } group2Tables.put(groupId, tbl.getId()); table2Group.put(tbl.getId(), groupId); return groupId; @@ -252,6 +303,7 @@ public boolean removeTable(long tableId) { } GroupId groupId = table2Group.remove(tableId); + groupId.removeTblId2DbId(tableId); group2Tables.remove(groupId, tableId); if (!group2Tables.containsKey(groupId)) { // all tables of this group are removed, remove the group @@ -514,14 +566,19 @@ public GroupId changeGroup(long dbId, OlapTable tbl, String oldGroup, String new // remove from old group removeTable(tbl.getId()); } - return addTableToGroup(dbId, tbl, newGroup, assignedGroupId); + String fullNewGroupName = GroupId.getFullGroupName(dbId, newGroup); + return addTableToGroup(dbId, tbl, fullNewGroupName, assignedGroupId); } finally { writeUnlock(); } } public void replayAddTableToGroup(ColocatePersistInfo info) throws MetaNotFoundException { - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(info.getGroupId().dbId); + long dbId = info.getGroupId().dbId; + if (dbId == 0) { + dbId = info.getGroupId().getDbIdByTblId(info.getTableId()); + } + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); OlapTable tbl = (OlapTable) db.getTableOrMetaException(info.getTableId(), org.apache.doris.catalog.Table.TableType.OLAP); writeLock(); @@ -530,7 +587,8 @@ public void replayAddTableToGroup(ColocatePersistInfo info) throws MetaNotFoundE for (Map.Entry>> entry : map.entrySet()) { group2BackendsPerBucketSeq.put(info.getGroupId(), entry.getKey(), entry.getValue()); } - addTableToGroup(info.getGroupId().dbId, tbl, tbl.getColocateGroup(), info.getGroupId()); + String fullGroupName = GroupId.getFullGroupName(dbId, tbl.getColocateGroup()); + addTableToGroup(dbId, tbl, fullGroupName, info.getGroupId()); } finally { writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 632ea3daabf713..579bc51677ad21 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3885,23 +3885,23 @@ public void replayRenameTable(TableInfo tableInfo) throws MetaNotFoundException } // the invoker should keep table's write lock - public void modifyTableColocate(Database db, OlapTable table, String colocateGroup, boolean isReplay, + public void modifyTableColocate(Database db, OlapTable table, String assignedGroup, boolean isReplay, GroupId assignedGroupId) throws DdlException { String oldGroup = table.getColocateGroup(); GroupId groupId = null; - if (!Strings.isNullOrEmpty(colocateGroup)) { - String fullGroupName = db.getId() + "_" + colocateGroup; + if (!Strings.isNullOrEmpty(assignedGroup)) { + String fullAssignedGroupName = GroupId.getFullGroupName(db.getId(), assignedGroup); //When the new name is the same as the old name, we return it to prevent npe if (!Strings.isNullOrEmpty(oldGroup)) { - String oldFullGroupName = db.getId() + "_" + oldGroup; - if (oldFullGroupName.equals(fullGroupName)) { + String oldFullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup); + if (oldFullGroupName.equals(fullAssignedGroupName)) { LOG.warn("modify table[{}] group name same as old group name,skip.", table.getName()); return; } } - ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName); + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullAssignedGroupName); if (groupSchema == null) { // user set a new colocate group, // check if all partitions all this table has same buckets num and same replication number @@ -3938,7 +3938,7 @@ public void modifyTableColocate(Database db, OlapTable table, String colocateGro backendsPerBucketSeq = table.getArbitraryTabletBucketsSeq(); } // change group after getting backends sequence(if has), in case 'getArbitraryTabletBucketsSeq' failed - groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, colocateGroup, assignedGroupId); + groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, assignedGroup, assignedGroupId); if (groupSchema == null) { Preconditions.checkNotNull(backendsPerBucketSeq); @@ -3948,7 +3948,7 @@ public void modifyTableColocate(Database db, OlapTable table, String colocateGro // set this group as unstable colocateTableIndex.markGroupUnstable(groupId, "Colocation group modified by user", false /* edit log is along with modify table log */); - table.setColocateGroup(colocateGroup); + table.setColocateGroup(assignedGroup); } else { // unset colocation group if (Strings.isNullOrEmpty(oldGroup)) { @@ -3957,17 +3957,16 @@ public void modifyTableColocate(Database db, OlapTable table, String colocateGro } // when replayModifyTableColocate, we need the groupId info - String fullGroupName = db.getId() + "_" + oldGroup; + String fullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup); groupId = colocateTableIndex.getGroupSchema(fullGroupName).getGroupId(); - colocateTableIndex.removeTable(table.getId()); table.setColocateGroup(null); } if (!isReplay) { Map properties = Maps.newHashMapWithExpectedSize(1); - properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateGroup); - TablePropertyInfo info = new TablePropertyInfo(table.getId(), groupId, properties); + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, assignedGroup); + TablePropertyInfo info = new TablePropertyInfo(db.getId(), table.getId(), groupId, properties); editLog.logModifyTableColocate(info); } LOG.info("finished modify table's colocation property. table: {}, is replay: {}", table.getName(), isReplay); @@ -3975,6 +3974,10 @@ public void modifyTableColocate(Database db, OlapTable table, String colocateGro public void replayModifyTableColocate(TablePropertyInfo info) throws MetaNotFoundException { long dbId = info.getGroupId().dbId; + if (dbId == 0) { + dbId = info.getDbId(); + } + Preconditions.checkState(dbId != 0, "replay modify table colocate failed, table id: " + info.getTableId()); long tableId = info.getTableId(); Map properties = info.getPropertyMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 22d08c07180e40..37030dc088b1a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -146,11 +146,6 @@ private void relocateAndBalanceGroup() { // get all groups Set groupIds = colocateIndex.getAllGroupIds(); for (GroupId groupId : groupIds) { - Database db = env.getInternalCatalog().getDbNullable(groupId.dbId); - if (db == null) { - continue; - } - Table statisticMap = env.getTabletScheduler().getStatisticMap(); if (statisticMap == null) { continue; @@ -159,7 +154,7 @@ private void relocateAndBalanceGroup() { ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId); ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc(); try { - Env.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc); + Env.getCurrentSystemInfo().checkReplicaAllocation(SystemInfoService.DEFAULT_CLUSTER, replicaAlloc); } catch (DdlException e) { colocateIndex.setErrMsgForGroup(groupId, e.getMessage()); continue; @@ -168,7 +163,7 @@ private void relocateAndBalanceGroup() { for (Map.Entry entry : allocMap.entrySet()) { Tag tag = entry.getKey(); - ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName(), tag); + ClusterLoadStatistic statistic = statisticMap.get(SystemInfoService.DEFAULT_CLUSTER, tag); if (statistic == null) { continue; } @@ -182,7 +177,8 @@ private void relocateAndBalanceGroup() { infoService, colocateIndex, groupId, tag); // get all available backends for this group Set beIdsInOtherTag = colocateIndex.getBackendIdsExceptForTag(groupId, tag); - List availableBeIds = getAvailableBeIds(db.getClusterName(), tag, beIdsInOtherTag, infoService); + List availableBeIds = getAvailableBeIds(SystemInfoService.DEFAULT_CLUSTER, tag, beIdsInOtherTag, + infoService); // try relocate or balance this group for specified tag List> balancedBackendsPerBucketSeq = Lists.newArrayList(); if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex, @@ -214,11 +210,6 @@ private void matchGroup() { Set groupIds = colocateIndex.getAllGroupIds(); for (GroupId groupId : groupIds) { List tableIds = colocateIndex.getAllTableIds(groupId); - Database db = env.getInternalCatalog().getDbNullable(groupId.dbId); - if (db == null) { - continue; - } - List> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId); if (backendBucketsSeq.isEmpty()) { continue; @@ -227,6 +218,14 @@ private void matchGroup() { String unstableReason = null; OUT: for (Long tableId : tableIds) { + long dbId = groupId.dbId; + if (dbId == 0) { + dbId = groupId.getDbIdByTblId(tableId); + } + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 1a4d212059f1b2..137630b516364b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -435,6 +435,7 @@ public static double analyzeBloomFilterFpp(Map properties) throw return bfFpp; } + // analyze the colocation properties of table public static String analyzeColocate(Map properties) throws AnalysisException { String colocateGroup = null; if (properties != null && properties.containsKey(PROPERTIES_COLOCATE_WITH)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 456eee8cf26469..e2be6ff0a73541 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1443,7 +1443,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa // check colocation if (Env.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { - String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup(); + String fullGroupName = GroupId.getFullGroupName(db.getId(), olapTable.getColocateGroup()); ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName); Preconditions.checkNotNull(groupSchema); groupSchema.checkDistribution(distributionInfo); @@ -2066,7 +2066,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) { throw new AnalysisException("Random distribution for colocate table is unsupported"); } - String fullGroupName = db.getId() + "_" + colocateGroup; + String fullGroupName = GroupId.getFullGroupName(db.getId(), colocateGroup); ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName); if (groupSchema != null) { // group already exist, check if this table can be added to this group @@ -2075,7 +2075,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep } // add table to this group, if group does not exist, create a new one Env.getCurrentColocateIndex() - .addTableToGroup(db.getId(), olapTable, colocateGroup, null /* generate group id inside */); + .addTableToGroup(db.getId(), olapTable, fullGroupName, null /* generate group id inside */); olapTable.setColocateGroup(colocateGroup); } } catch (AnalysisException e) { @@ -2277,7 +2277,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep if (result.second) { if (Env.getCurrentColocateIndex().isColocateTable(tableId)) { - // if this is a colocate join table, its table id is already added to colocate group + // if this is a colocate table, its table id is already added to colocate group // so we should remove the tableId here Env.getCurrentColocateIndex().removeTable(tableId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 74e4644fd98a77..9c08ff555ebf7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -485,8 +485,7 @@ public void readFields(DataInput in) throws IOException { break; } case OperationType.OP_MODIFY_TABLE_COLOCATE: { - data = new TablePropertyInfo(); - ((TablePropertyInfo) data).readFields(in); + data = TablePropertyInfo.read(in); isRead = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java index 408839c616215b..459be6460524ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java @@ -46,10 +46,6 @@ public class ColocatePersistInfo implements Writable { @SerializedName(value = "backendsPerBucketSeq") private Map>> backendsPerBucketSeq = Maps.newHashMap(); - public ColocatePersistInfo() { - - } - private ColocatePersistInfo(GroupId groupId, long tableId, Map>> backendsPerBucketSeq) { this.groupId = groupId; this.tableId = tableId; @@ -74,10 +70,6 @@ public static ColocatePersistInfo createForMarkStable(GroupId groupId) { return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap()); } - public static ColocatePersistInfo createForRemoveTable(long tableId) { - return new ColocatePersistInfo(new GroupId(-1, -1), tableId, Maps.newHashMap()); - } - public static ColocatePersistInfo read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ColocatePersistInfo.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java index 0efbe65db65073..959ccaf9c401e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java @@ -18,10 +18,14 @@ package org.apache.doris.persist; import org.apache.doris.catalog.ColocateTableIndex.GroupId; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -32,15 +36,20 @@ * PersistInfo for Table properties */ public class TablePropertyInfo implements Writable { + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "tableId") private long tableId; + @SerializedName(value = "propertyMap") private Map propertyMap; + @SerializedName(value = "groupId") private GroupId groupId; - public TablePropertyInfo() { - + private TablePropertyInfo() { } - public TablePropertyInfo(long tableId, GroupId groupId, Map propertyMap) { + public TablePropertyInfo(long dbId, long tableId, GroupId groupId, Map propertyMap) { + this.dbId = dbId; this.tableId = tableId; this.groupId = groupId; this.propertyMap = propertyMap; @@ -50,6 +59,10 @@ public Map getPropertyMap() { return propertyMap; } + public long getDbId() { + return dbId; + } + public long getTableId() { return tableId; } @@ -60,22 +73,22 @@ public GroupId getGroupId() { @Override public void write(DataOutput out) throws IOException { - out.writeLong(tableId); - if (groupId == null) { - out.writeBoolean(false); + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static TablePropertyInfo read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_119) { + TablePropertyInfo info = new TablePropertyInfo(); + info.readFields(in); + return info; } else { - out.writeBoolean(true); - groupId.write(out); - } - int size = propertyMap.size(); - out.writeInt(size); - for (Map.Entry kv : propertyMap.entrySet()) { - Text.writeString(out, kv.getKey()); - Text.writeString(out, kv.getValue()); + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, TablePropertyInfo.class); } } - public void readFields(DataInput in) throws IOException { + @Deprecated + private void readFields(DataInput in) throws IOException { tableId = in.readLong(); if (in.readBoolean()) { groupId = GroupId.read(in); @@ -102,13 +115,14 @@ public boolean equals(Object obj) { TablePropertyInfo info = (TablePropertyInfo) obj; - return tableId == info.tableId && groupId.equals(info.groupId) + return dbId == info.dbId && tableId == info.tableId && groupId.equals(info.groupId) && propertyMap.equals(info.propertyMap); } @Override public String toString() { StringBuilder sb = new StringBuilder(); + sb.append(" db id: ").append(dbId); sb.append(" table id: ").append(tableId); sb.append(" group id: ").append(groupId); sb.append(" propertyMap: ").append(propertyMap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 0537ec5babc87a..43e911674a5ecf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -592,7 +592,7 @@ private boolean dataDistributionMatchEqPredicate(List eqJoinPre } } - //3 the join columns should contains all distribute columns to enable colocate join + //3 the join columns should contain all distribute columns to enable colocate join if (leftJoinColumns.containsAll(leftDistributeColumns) && rightJoinColumns.containsAll(rightDistributeColumns)) { return true; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java index 404ff80aebe574..7e0e2baacda6b8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java @@ -129,7 +129,7 @@ public void testCreateOneTable() throws Exception { Map>> backendIds = index.getBackendsPerBucketSeq(groupId); Assert.assertEquals(1, backendIds.get(Tag.DEFAULT_BACKEND_TAG).get(0).size()); - String fullGroupName = dbId + "_" + groupName; + String fullGroupName = GroupId.getFullGroupName(dbId, groupName); Assert.assertEquals(tableId, index.getTableIdByGroup(fullGroupName)); ColocateGroupSchema groupSchema = index.getGroupSchema(fullGroupName); Assert.assertNotNull(groupSchema); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 7583c4cd69ea4e..ceda6e826024be 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -17,66 +17,61 @@ package org.apache.doris.planner; -import org.apache.doris.analysis.CreateDbStmt; -import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.ColocateGroupSchema; +import org.apache.doris.catalog.ColocateTableIndex; +import org.apache.doris.catalog.ColocateTableIndex.GroupId; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.FeConstants; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QueryStatisticsItem; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.utframe.UtFrameUtils; +import org.apache.doris.utframe.TestWithFeService; import org.apache.commons.lang.StringUtils; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import java.io.File; import java.util.List; -import java.util.UUID; -public class ColocatePlanTest { +public class ColocatePlanTest extends TestWithFeService { public static final String COLOCATE_ENABLE = "COLOCATE"; - private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; - private static ConnectContext ctx; + private static final String GLOBAL_GROUP = "__global__group1"; + private static final String GLOBAL_GROUP2 = "__global__group2"; - @BeforeClass - public static void setUp() throws Exception { + @Override + protected void runBeforeAll() throws Exception { FeConstants.runningUnitTest = true; - UtFrameUtils.createDorisCluster(runningDir, 2); - ctx = UtFrameUtils.createDefaultCtx(); - String createDbStmtStr = "create database db1;"; - CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); - Env.getCurrentEnv().createDb(createDbStmt); - // create table test_colocate (k1 int ,k2 int, k3 int, k4 int) - // distributed by hash(k1, k2) buckets 10 - // properties ("replication_num" = "2"); - String createColocateTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " + createDatabase("db1"); + createTable("create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," - + "'colocate_with' = 'group1');"; - CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx); - Env.getCurrentEnv().createTable(createColocateTableStmt); - String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3 int, k4 int)" + + "'colocate_with' = 'group1');"); + createTable("create table db1.test(k1 int, k2 int, k3 int, k4 int)" + "partition by range(k1) (partition p1 values less than (\"1\"), partition p2 values less than (\"2\"))" - + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')"; - CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); - Env.getCurrentEnv().createTable(createTableStmt); - - String createMultiPartitionTableStmt = "create table db1.test_multi_partition(k1 int, k2 int)" + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')"); + createTable("create table db1.test_multi_partition(k1 int, k2 int)" + "partition by range(k1) (partition p1 values less than(\"1\"), partition p2 values less than (\"2\"))" - + "distributed by hash(k2) buckets 10 properties ('replication_num' = '2', 'colocate_with' = 'group2')"; - CreateTableStmt createMultiTableStmt = (CreateTableStmt) UtFrameUtils - .parseAndAnalyzeStmt(createMultiPartitionTableStmt, ctx); - Env.getCurrentEnv().createTable(createMultiTableStmt); + + "distributed by hash(k2) buckets 10 properties ('replication_num' = '2', 'colocate_with' = 'group2')"); + + // global colocate tables + createDatabase("db2"); + createTable("create table db1.test_global_colocate1(k1 varchar(10), k2 int, k3 int, k4 int) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," + + "'colocate_with' = '" + GLOBAL_GROUP + "');"); + createTable("create table db2.test_global_colocate2(k1 varchar(20), k2 int, k3 int) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," + + "'colocate_with' = '" + GLOBAL_GROUP + "');"); + createTable("create table db2.test_global_colocate3(k1 varchar(20), k2 int, k3 date) " + + "partition by range(k3) (partition p1 values less than(\"2020-01-01\"), partition p2 values less than (\"2020-02-01\")) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," + + "'colocate_with' = '" + GLOBAL_GROUP + "');"); } - @AfterClass - public static void tearDown() { - File file = new File(runningDir); - file.delete(); + @Override + protected int backendNum() { + return 2; } // without @@ -84,9 +79,9 @@ public static void tearDown() { // 2. join: src data has been redistributed @Test public void sqlDistributedSmallerThanData1() throws Exception { - String sql = "explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b " - + "where a.k1=b.k1"; - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + String plan1 = getSQLPlanOrErrorMsg( + "explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b " + + "where a.k1=b.k1"); Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA)); } @@ -96,7 +91,7 @@ public void sqlDistributedSmallerThanData1() throws Exception { public void sqlDistributedSmallerThanData2() throws Exception { String sql = "explain select * from (select k1 from db1.test_colocate group by k1, k2) a , db1.test_colocate b " + "where a.k1=b.k1"; - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + String plan1 = getSQLPlanOrErrorMsg(sql); Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY)); } @@ -105,9 +100,10 @@ public void sqlDistributedSmallerThanData2() throws Exception { // 2. hash columns = agg output columns = distributed columns @Test public void sqlAggAndJoinSameAsTableMeta() throws Exception { - String sql = "explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b " - + "where a.k1=b.k1 and a.k2=b.k2"; - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + String sql = + "explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b " + + "where a.k1=b.k1 and a.k2=b.k2"; + String plan1 = getSQLPlanOrErrorMsg(sql); Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); } @@ -119,7 +115,7 @@ public void sqlAggAndJoinSameAsTableMeta() throws Exception { public void sqlAggAndJoinMoreThanTableMeta() throws Exception { String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , " + "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 and a.k3=b.k3"; - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + String plan1 = getSQLPlanOrErrorMsg(sql); Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); } @@ -131,7 +127,7 @@ public void sqlAggAndJoinMoreThanTableMeta() throws Exception { public void sqlAggMoreThanTableMeta() throws Exception { String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , " + "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2"; - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + String plan1 = getSQLPlanOrErrorMsg(sql); Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); } @@ -144,7 +140,7 @@ public void sqlAggMoreThanTableMeta() throws Exception { @Test public void sqlAggWithNonColocateTable() throws Exception { String sql = "explain select k1, k2 from db1.test group by k1, k2"; - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + String plan1 = getSQLPlanOrErrorMsg(sql); Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertFalse(plan1.contains(COLOCATE_ENABLE)); } @@ -156,7 +152,7 @@ public void sqlAggWithNonColocateTable() throws Exception { @Test public void sqlAggWithColocateTable() throws Exception { String sql = "select k1, k2, count(*) from db1.test_multi_partition where k2 = 1 group by k1, k2"; - StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql); + StmtExecutor executor = getSqlStmtExecutor(sql); Planner planner = executor.planner(); Coordinator coordinator = Deencapsulation.getField(executor, "coord"); List scanNodeList = planner.getScanNodes(); @@ -173,8 +169,9 @@ public void sqlAggWithColocateTable() throws Exception { @Test public void checkColocatePlanFragment() throws Exception { - String sql = "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;"; - StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql); + String sql + = "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;"; + StmtExecutor executor = getSqlStmtExecutor(sql); Planner planner = executor.planner(); Coordinator coordinator = Deencapsulation.getField(executor, "coord"); boolean isColocateFragment0 = Deencapsulation.invoke(coordinator, "isColocateFragment", @@ -190,18 +187,120 @@ public void checkColocatePlanFragment() throws Exception { public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception { String createColocateTblStmtStr = "create table db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) " + "distributed by hash(k1, k2, k3) buckets 10 properties('replication_num' = '1');"; - CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx); - Env.getCurrentEnv().createTable(createColocateTableStmt); - + createTable(createColocateTblStmtStr); String sql = "select a.k1, a.k2, sum(a.k3) " + "from db1.test_colocate_one_backend a join[shuffle] db1.test_colocate_one_backend b on a.k1=b.k1 " + "group by rollup(a.k1, a.k2);"; - Deencapsulation.setField(ctx.getSessionVariable(), "parallelExecInstanceNum", 2); - String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Deencapsulation.setField(connectContext.getSessionVariable(), "parallelExecInstanceNum", 2); + String plan1 = getSQLPlanOrErrorMsg(sql); Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertEquals(5, StringUtils.countMatches(plan1, "PLAN FRAGMENT")); - } + @Test + public void testGlobalColocateGroup() throws Exception { + Database db1 = Env.getCurrentEnv().getInternalCatalog().getDbNullable("default_cluster:db1"); + Database db2 = Env.getCurrentEnv().getInternalCatalog().getDbNullable("default_cluster:db2"); + OlapTable tbl1 = (OlapTable) db1.getTableNullable("test_global_colocate1"); + OlapTable tbl2 = (OlapTable) db2.getTableNullable("test_global_colocate2"); + OlapTable tbl3 = (OlapTable) db2.getTableNullable("test_global_colocate3"); + + String sql = "explain select * from (select k1, k2 from " + + "db1.test_global_colocate1 group by k1, k2) a , db2.test_global_colocate2 b " + + "where a.k1=b.k1 and a.k2=b.k2"; + String plan1 = getSQLPlanOrErrorMsg(sql); + Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); + ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema( + GroupId.getFullGroupName(1000, GLOBAL_GROUP)); + Assert.assertNotNull(groupSchema); + GroupId groupId = groupSchema.getGroupId(); + List tableIds = colocateTableIndex.getAllTableIds(groupId); + Assert.assertEquals(3, tableIds.size()); + Assert.assertTrue(tableIds.contains(tbl1.getId())); + Assert.assertTrue(tableIds.contains(tbl2.getId())); + Assert.assertTrue(tableIds.contains(tbl3.getId())); + Assert.assertEquals(3, groupId.getTblId2DbIdSize()); + Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId())); + Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId())); + Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl3.getId())); + + sql = "explain select * from (select k1, k2 from " + + "db1.test_global_colocate1 group by k1, k2) a , db2.test_global_colocate3 b " + + "where a.k1=b.k1 and a.k2=b.k2"; + plan1 = getSQLPlanOrErrorMsg(sql); + Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); + + String addPartitionStmt + = "alter table db2.test_global_colocate3 add partition p3 values less than (\"2020-03-01\");"; + alterTableSync(addPartitionStmt); + + try { + createTable("create table db1.test_global_colocate4(k1 int, k2 int, k3 int, k4 int) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," + + "'colocate_with' = '" + GLOBAL_GROUP + "');"); + Assert.fail(); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue( + e.getMessage().contains("Colocate tables distribution columns must have the same data type")); + List tmpTableIds = colocateTableIndex.getAllTableIds(groupId); + Assert.assertEquals(3, tmpTableIds.size()); + Assert.assertTrue(tmpTableIds.contains(tbl1.getId())); + Assert.assertTrue(tmpTableIds.contains(tbl2.getId())); + Assert.assertTrue(tmpTableIds.contains(tbl3.getId())); + Assert.assertEquals(3, groupId.getTblId2DbIdSize()); + Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId())); + Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId())); + Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl3.getId())); + } + + // modify table's colocate group + String modifyStmt = "alter table db2.test_global_colocate3 set ('colocate_with' = '');"; + alterTableSync(modifyStmt); + tableIds = colocateTableIndex.getAllTableIds(groupId); + Assert.assertEquals(2, tableIds.size()); + Assert.assertTrue(tableIds.contains(tbl1.getId())); + Assert.assertTrue(tableIds.contains(tbl2.getId())); + Assert.assertEquals(2, groupId.getTblId2DbIdSize()); + Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId())); + Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId())); + // change table's colocate group + modifyStmt = "alter table db2.test_global_colocate2 set ('colocate_with' = '" + GLOBAL_GROUP2 + "');"; + alterTableSync(modifyStmt); + tableIds = colocateTableIndex.getAllTableIds(groupId); + Assert.assertEquals(1, tableIds.size()); + Assert.assertTrue(tableIds.contains(tbl1.getId())); + Assert.assertEquals(1, groupId.getTblId2DbIdSize()); + Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId())); + + GroupId groupId2 = colocateTableIndex.getGroupSchema( + GroupId.getFullGroupName(1000, GLOBAL_GROUP2)).getGroupId(); + tableIds = colocateTableIndex.getAllTableIds(groupId2); + Assert.assertEquals(1, tableIds.size()); + Assert.assertTrue(tableIds.contains(tbl2.getId())); + Assert.assertEquals(1, groupId2.getTblId2DbIdSize()); + Assert.assertEquals(db2.getId(), groupId2.getDbIdByTblId(tbl2.getId())); + + // checkpoint + // Get currentCatalog first + Env currentEnv = Env.getCurrentEnv(); + // Save real ckptThreadId + long ckptThreadId = currentEnv.getCheckpointer().getId(); + try { + // set checkpointThreadId to current thread id, so that when do checkpoint manually here, + // the Catalog.isCheckpointThread() will return true. + Deencapsulation.setField(Env.class, "checkpointThreadId", Thread.currentThread().getId()); + currentEnv.getCheckpointer().doCheckpoint(); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + // Restore the ckptThreadId + Deencapsulation.setField(Env.class, "checkpointThreadId", ckptThreadId); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index c96e460b2699d5..72ac00d60eb6aa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -617,6 +617,12 @@ protected void addRollup(String sql) throws Exception { Thread.sleep(100); } + protected void alterTableSync(String sql) throws Exception { + AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + Env.getCurrentEnv().alterTable(alterTableStmt); + Thread.sleep(100); + } + protected void createMv(String sql) throws Exception { CreateMaterializedViewStmt createMaterializedViewStmt = (CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); diff --git a/regression-test/data/correctness_p0/test_colocate_join.out b/regression-test/data/correctness_p0/test_colocate_join.out new file mode 100644 index 00000000000000..aa5795a72f84d7 --- /dev/null +++ b/regression-test/data/correctness_p0/test_colocate_join.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !global1 -- +1 jack 2022-01-01 1 jack 2022-01-01 10 +2 jack1 2022-01-02 2 jack1 2022-01-02 11 +3 jack2 2022-01-03 3 jack2 2022-01-03 12 +4 jack3 2022-02-01 4 jack3 2022-02-01 13 +5 jack4 2022-02-01 5 jack4 2022-02-01 14 + +-- !global2 -- +1 jack 2022-01-01 1 jack 2022-01-01 10 +2 jack1 2022-01-02 2 jack1 2022-01-02 11 +3 jack2 2022-01-03 3 jack2 2022-01-03 12 +4 jack3 2022-02-01 4 jack3 2022-02-01 13 +5 jack4 2022-02-01 5 jack4 2022-02-01 14 + +-- !global3 -- +1 jack 2022-01-01 1 jack 2022-01-01 10 +2 jack1 2022-01-02 2 jack1 2022-01-02 11 +3 jack2 2022-01-03 3 jack2 2022-01-03 12 +4 jack3 2022-02-01 4 jack3 2022-02-01 13 +5 jack4 2022-02-01 5 jack4 2022-02-01 14 + diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy b/regression-test/suites/correctness_p0/test_colocate_join.groovy index e8bd7206b8a769..e5d7ffcbec499f 100644 --- a/regression-test/suites/correctness_p0/test_colocate_join.groovy +++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy @@ -16,11 +16,22 @@ // under the License. suite("test_colocate_join") { + def db1 = "test_colocate_join_db1" + def db2 = "test_colocate_join_db2" + sql """ drop database if exists ${db1}""" + sql """ drop database if exists ${db2}""" + sql """ create database if not exists ${db1}""" + sql """ create database if not exists ${db2}""" + sql """ use ${db1}""" + sql """ DROP TABLE IF EXISTS `test_colo1` """ sql """ DROP TABLE IF EXISTS `test_colo2` """ sql """ DROP TABLE IF EXISTS `test_colo3` """ sql """ DROP TABLE IF EXISTS `test_colo4` """ sql """ DROP TABLE IF EXISTS `test_colo5` """ + sql """ DROP TABLE IF EXISTS `test_global_tbl1` """ + sql """ DROP TABLE IF EXISTS `test_global_tbl2` """ + sql """ DROP TABLE IF EXISTS ${db2}.`test_global_tbl3` """ sql """ CREATE TABLE `test_colo1` ( @@ -112,6 +123,46 @@ suite("test_colocate_join") { ); """ + sql """ + create table test_global_tbl1 ( + id int, + name varchar(100), + dt date + ) + distributed by hash(id, name) buckets 4 + properties("colocate_with" = "__global__group1", + "replication_num" = "1"); + """ + + sql """ + create table test_global_tbl2 ( + id int, + name varchar(20), + dt date, + age bigint + ) + distributed by hash(id, name) buckets 4 + properties("colocate_with" = "__global__group1", + "replication_num" = "1"); + """ + + sql """ + create table ${db2}.test_global_tbl3 ( + id int, + name varchar(50), + dt date, + age bigint + ) + partition by range(dt) ( + partition p1 values less than("2022-02-01"), + partition p2 values less than("2022-03-01"), + partition p3 values less than("2022-04-01") + ) + distributed by hash(id, name) buckets 4 + properties("colocate_with" = "__global__group1", + "replication_num" = "1"); + """ + sql """insert into test_colo1 values('1','a',12);""" sql """insert into test_colo2 values('1','a',12);""" sql """insert into test_colo3 values('1','a',12);""" @@ -193,4 +244,64 @@ suite("test_colocate_join") { sql """ DROP TABLE IF EXISTS `tbl1`;""" sql """ DROP TABLE IF EXISTS `tbl2`;""" + sql """insert into ${db1}.test_global_tbl1 values + (1,"jack", "2022-01-01"), + (2,"jack1", "2022-01-02"), + (3,"jack2", "2022-01-03"), + (4,"jack3", "2022-02-01"), + (5,"jack4", "2022-02-01"), + (6, null, "2022-03-01"); + """ + + sql """insert into ${db1}.test_global_tbl2 values + (1,"jack", "2022-01-01", 10), + (2,"jack1", "2022-01-02", 11), + (3,"jack2", "2022-01-03", 12), + (4,"jack3", "2022-02-01", 13), + (5,"jack4", "2022-02-01", 14), + (6,null, "2022-03-01", 15); + """ + + sql """insert into ${db2}.test_global_tbl3 values + (1,"jack", "2022-01-01", 10), + (2,"jack1", "2022-01-02", 11), + (3,"jack2", "2022-01-03", 12), + (4,"jack3", "2022-02-01", 13), + (5,"jack4", "2022-02-01", 14), + (6,null, "2022-03-01", 15); + """ + + order_qt_global1 """select * from ${db1}.test_global_tbl1 a join ${db1}.test_global_tbl2 b on a.id = b.id and a.name = b.name """ + order_qt_global2 """select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name """ + + explain { + sql ("select * from ${db1}.test_global_tbl1 a join ${db1}.test_global_tbl2 b on a.id = b.id and a.name = b.name") + contains "COLOCATE" + } + explain { + sql ("select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name") + contains "COLOCATE" + } + /* add partition */ + sql """alter table ${db2}.test_global_tbl3 add partition p4 values less than("2022-05-01")""" + sql """insert into ${db2}.test_global_tbl3 values (7, "jack7", "2022-04-01", 16)""" + order_qt_global3 """select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name """ + explain { + sql ("select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name") + contains "COLOCATE" + } + + /* modify group: unset */ + sql """alter table ${db2}.test_global_tbl3 set ("colocate_with" = "");""" + explain { + sql ("select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name") + contains "Tables are not in the same group" + } + + /* modify group: from global to database level */ + sql """alter table ${db1}.test_global_tbl2 set ("colocate_with" = "db_level_group");""" + explain { + sql ("select * from ${db1}.test_global_tbl1 a join ${db1}.test_global_tbl2 b on a.id = b.id and a.name = b.name") + contains "Tables are not in the same group" + } }