Skip to content

Commit

Permalink
[Improment]publish workload to BE by tag (apache#38486)
Browse files Browse the repository at this point in the history
## Proposed changes
A workload group's tag property may be three cases as below:
1 empty string, null or '', it could be published to all BE.
2 a value match some BE' location, then the workload group could only be
published to the BE with same tag.
3 not an empty string, but some invalid string which can not math any
BE's location, then it could not be published any BE.
  • Loading branch information
wangbo authored Jul 31, 2024
1 parent dcb6a6f commit 086236e
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;

import org.apache.logging.log4j.LogManager;
Expand All @@ -35,8 +36,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;

public class TopicPublisherThread extends MasterDaemon {
Expand Down Expand Up @@ -126,7 +129,30 @@ public void run() {
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address);
client.publishTopicInfo(request);
// check whether workload group tag math current be
TPublishTopicRequest copiedRequest = request.deepCopy();
if (copiedRequest.isSetTopicMap()) {
Map<TTopicInfoType, List<TopicInfo>> topicMap = copiedRequest.getTopicMap();
List<TopicInfo> topicInfoList = topicMap.get(TTopicInfoType.WORKLOAD_GROUP);
if (topicInfoList != null) {
Set<String> beTagSet = be.getBeWorkloadGroupTagSet();
Iterator<TopicInfo> topicIter = topicInfoList.iterator();
while (topicIter.hasNext()) {
TopicInfo topicInfo = topicIter.next();
if (topicInfo.isSetWorkloadGroupInfo()) {
TWorkloadGroupInfo tWgInfo = topicInfo.getWorkloadGroupInfo();
if (tWgInfo.isSetTag() && !Backend.isMatchWorkloadGroupTag(
tWgInfo.getTag(), beTagSet)) {
// currently TopicInfo could not contain both policy and workload group,
// so we can remove TopicInfo directly.
topicIter.remove();
}
}
}
}
}

client.publishTopicInfo(copiedRequest);
ok = true;
LOG.info("[topic_publish]publish topic info to be {} success, time cost={} ms, details:{}",
be.getHost(), (System.currentTimeMillis() - beginTime), logStr);
Expand Down
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class Tag implements Writable {
public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = "cloud_cluster_private_endpoint";
public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";

public static final String WORKLOAD_GROUP = "workload_group";

public static final ImmutableSet<String> RESERVED_TAG_TYPE = ImmutableSet.of(
TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
public static final ImmutableSet<String> RESERVED_TAG_VALUES = ImmutableSet.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.doris.resource.workloadgroup;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
Expand All @@ -30,7 +32,6 @@
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -189,9 +190,7 @@ public static WorkloadGroup copyAndUpdate(WorkloadGroup currentWorkloadGroup, Ma
throws DdlException {
Map<String, String> newProperties = new HashMap<>(currentWorkloadGroup.getProperties());
for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
if (!Strings.isNullOrEmpty(kv.getValue())) {
newProperties.put(kv.getKey(), kv.getValue());
}
newProperties.put(kv.getKey(), kv.getValue());
}

checkProperties(newProperties);
Expand Down Expand Up @@ -416,6 +415,18 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx
}
}

String tagStr = properties.get(TAG);
if (!StringUtils.isEmpty(tagStr)) {
String[] tagArr = tagStr.split(",");
for (String tag : tagArr) {
try {
FeNameFormat.checkCommonName("workload group tag name", tag);
} catch (AnalysisException e) {
throw new DdlException("workload group tag name format is illegal, " + tagStr);
}
}
}

}

public long getId() {
Expand Down Expand Up @@ -605,6 +616,11 @@ public TopicInfo toTopicInfo() {
tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr));
}

String tagStr = properties.get(TAG);
if (!StringUtils.isEmpty(tagStr)) {
tWorkloadGroupInfo.setTag(tagStr);
}

TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
String workloadGroupName = stmt.getWorkloadGroupName();
Map<String, String> properties = stmt.getProperties();
if (properties.size() == 0) {
throw new DdlException("alter workload group should contain at least one property");
}
WorkloadGroup newWorkloadGroup;
writeLock();
try {
Expand Down
37 changes: 37 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -932,4 +937,36 @@ public void setPublishTaskLastTimeAccumulated(Long accumulatedNum) {
this.lastPublishTaskAccumulatedNum = accumulatedNum;
}

public Set<String> getBeWorkloadGroupTagSet() {
Set<String> beTagSet = Sets.newHashSet();
String beTagStr = this.tagMap.get(Tag.WORKLOAD_GROUP);
if (StringUtils.isEmpty(beTagStr)) {
return beTagSet;
}

String[] beTagArr = beTagStr.split(",");
for (String beTag : beTagArr) {
beTagSet.add(beTag.trim());
}

return beTagSet;
}

public static boolean isMatchWorkloadGroupTag(String wgTagStr, Set<String> beTagSet) {
if (StringUtils.isEmpty(wgTagStr)) {
return true;
}
if (beTagSet.isEmpty()) {
return false;
}

String[] wgTagArr = wgTagStr.split(",");
Set<String> wgTagSet = new HashSet<>();
for (String wgTag : wgTagArr) {
wgTagSet.add(wgTag.trim());
}

return !Collections.disjoint(wgTagSet, beTagSet);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,24 @@ public void testAlterWorkloadGroup() throws UserException {
Config.enable_workload_group = true;
ConnectContext context = new ConnectContext();
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
Map<String, String> properties = Maps.newHashMap();
Map<String, String> p0 = Maps.newHashMap();
String name = "g1";
try {
AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, properties);
AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, p0);
workloadGroupMgr.alterWorkloadGroup(stmt1);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("alter workload group should contain at least one property"));
}

p0.put(WorkloadGroup.CPU_SHARE, "10");
try {
AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, p0);
workloadGroupMgr.alterWorkloadGroup(stmt1);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("does not exist"));
}

Map<String, String> properties = Maps.newHashMap();
properties.put(WorkloadGroup.CPU_SHARE, "10");
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ struct TWorkloadGroupInfo {
13: optional i32 spill_threshold_high_watermark
14: optional i64 read_bytes_per_second
15: optional i64 remote_read_bytes_per_second
16: optional string tag
}

enum TWorkloadMetricType {
Expand Down

0 comments on commit 086236e

Please sign in to comment.