diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroup.java b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroup.java index 32ac188ee0103b..e7bb43c73d4e7a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroup.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroup.java @@ -15,23 +15,17 @@ package com.starrocks.catalog; import com.google.gson.annotations.SerializedName; -import com.starrocks.common.io.Text; -import com.starrocks.common.io.Writable; -import com.starrocks.persist.gson.GsonUtils; import com.starrocks.qe.ShowResultSetMetaData; import com.starrocks.thrift.TWorkGroup; import com.starrocks.thrift.TWorkGroupType; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -public class ResourceGroup implements Writable { +public class ResourceGroup { public static final String GROUP_TYPE = "type"; public static final String USER = "user"; public static final String ROLE = "role"; @@ -110,11 +104,6 @@ public class ResourceGroup implements Writable { public ResourceGroup() { } - public static ResourceGroup read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, ResourceGroup.class); - } - private List showClassifier(ResourceGroupClassifier classifier) { List row = new ArrayList<>(); row.add(this.name); @@ -156,11 +145,6 @@ public void setVersion(long version) { this.version = version; } - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, GsonUtils.GSON.toJson(this)); - } - public List> show() { if (classifiers.isEmpty()) { return Collections.singletonList(showClassifier(new ResourceGroupClassifier())); diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupClassifier.java b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupClassifier.java index 5661a9e7c17f8d..cd672131287844 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupClassifier.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupClassifier.java @@ -16,21 +16,11 @@ import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; -import com.starrocks.common.io.Text; -import com.starrocks.common.io.Writable; -import com.starrocks.persist.gson.GsonUtils; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.sql.ast.DmlStmt; -import com.starrocks.sql.ast.LoadStmt; -import com.starrocks.sql.ast.QueryStatement; -import com.starrocks.sql.ast.StatementBase; import com.starrocks.thrift.TQueryType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.net.util.SubnetUtils; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -39,7 +29,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -public class ResourceGroupClassifier implements Writable { +public class ResourceGroupClassifier { public static final Pattern USER_PATTERN = Pattern.compile("^[a-zA-Z][a-zA-Z0-9_]{1,63}/?[.a-zA-Z0-9_-]{0,63}$"); public static final Pattern USE_ROLE_PATTERN = Pattern.compile("^\\w+$"); public static final ImmutableSet SUPPORTED_QUERY_TYPES = @@ -66,11 +56,6 @@ public class ResourceGroupClassifier implements Writable { @SerializedName(value = "planMemCostRange") private CostRange planMemCostRange; - public static ResourceGroupClassifier read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, ResourceGroupClassifier.class); - } - public long getResourceGroupId() { return resourceGroupId; } @@ -131,14 +116,16 @@ public void setPlanCpuCostRange(CostRange planCpuCostRange) { this.planCpuCostRange = planCpuCostRange; } + public CostRange getPlanCpuCostRange() { + return planCpuCostRange; + } + public void setPlanMemCostRange(CostRange planMemCostRange) { this.planMemCostRange = planMemCostRange; } - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); + public CostRange getPlanMemCostRange() { + return planMemCostRange; } public boolean isSatisfied(String user, List activeRoles, QueryType queryType, String sourceIp, @@ -259,20 +246,6 @@ public enum QueryType { public static QueryType fromTQueryType(TQueryType type) { return type == TQueryType.LOAD ? INSERT : SELECT; } - - public static QueryType fromStatement(StatementBase stmt) { - if (stmt instanceof QueryStatement) { - return SELECT; - } - if (stmt instanceof DmlStmt) { - return INSERT; - } - if (stmt instanceof LoadStmt) { - return INSERT; - } - - return SELECT; - } } /** @@ -286,13 +259,15 @@ public static class CostRange { private static final Pattern STR_RANGE_PATTERN = Pattern.compile(STR_RANGE_REGEX, Pattern.CASE_INSENSITIVE); public static final String FORMAT_STR_RANGE_MESSAGE = "the format must be '[min, max)' " + - "where min and max are double (including infinity and -infinity) " + + "where min and max are finite double " + "and min must be less than max"; + @SerializedName(value = "min") private final double min; + @SerializedName(value = "max") private final double max; - public CostRange(double min, double max) { + private CostRange(double min, double max) { this.min = min; this.max = max; } @@ -304,8 +279,12 @@ public static CostRange fromString(String rangeStr) { } try { - double min = parseDoubleWithInfinity(matcher.group(1)); - double max = parseDoubleWithInfinity(matcher.group(2)); + double min = Double.parseDouble(matcher.group(1)); + double max = Double.parseDouble(matcher.group(2)); + + if (!Double.isFinite(min) || !Double.isFinite(max)) { + return null; + } if (min >= max) { return null; @@ -325,15 +304,5 @@ public boolean contains(double value) { public String toString() { return "[" + min + ", " + max + ")"; } - - private static double parseDoubleWithInfinity(String input) { - if ("infinity".equalsIgnoreCase(input)) { - return Double.POSITIVE_INFINITY; - } else if ("-infinity".equalsIgnoreCase(input)) { - return Double.NEGATIVE_INFINITY; - } else { - return Double.parseDouble(input); - } - } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java index eaf2e3a4ea2749..ebefcd769168eb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java @@ -246,7 +246,7 @@ public Set getAllResourceGroupNames() { @Override public void write(DataOutput out) throws IOException { - List resourceGroups = resourceGroupMap.values().stream().collect(Collectors.toList()); + List resourceGroups = new ArrayList<>(resourceGroupMap.values()); SerializeData data = new SerializeData(); data.resourceGroups = resourceGroups; diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/ResourceGroupOpEntry.java b/fe/fe-core/src/main/java/com/starrocks/persist/ResourceGroupOpEntry.java index 3812d0400cbbd0..b58cc1dd751a03 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/ResourceGroupOpEntry.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/ResourceGroupOpEntry.java @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - package com.starrocks.persist; import com.google.gson.annotations.SerializedName; diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java index b55bdcc0ca9899..67241f0d22a1cd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java @@ -74,6 +74,7 @@ import com.starrocks.thrift.TQueryOptions; import com.starrocks.thrift.TWorkGroup; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -716,6 +717,11 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { } else if (executor.getProxyResultBuffer() != null) { // query statement result.setChannelBufferList(executor.getProxyResultBuffer()); } + + String resourceGroupName = ctx.getAuditEventBuilder().build().resourceGroup; + if (StringUtils.isNotEmpty(resourceGroupName)) { + result.setResource_group_name(resourceGroupName); + } } return result; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java index a7d6e09a795ddf..64f95eb7c1717f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java @@ -110,6 +110,10 @@ public void execute() throws Exception { } } } + + if (result.isSetResource_group_name()) { + ctx.getAuditEventBuilder().setResourceGroup(result.getResource_group_name()); + } } private void afterForward() throws DdlException { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 6411b074b24ef0..662ff91720fb3e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -200,7 +200,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static com.starrocks.qe.CoordinatorPreprocessor.prepareResourceGroup; import static com.starrocks.sql.common.UnsupportedException.unsupportedException; // Do one COM_QUERY process. @@ -477,9 +476,6 @@ public void execute() throws Exception { return; } if (isForwardToLeader()) { - // Write the resource group information to audit log. - prepareResourceGroup(context, ResourceGroupClassifier.QueryType.fromStatement(parsedStmt)); - forwardToLeader(); return; } else { diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 0a4ca74fd4e072..01fed12f9a62d6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -1223,7 +1223,8 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { } // add this log so that we can track this stmt - LOG.info("receive forwarded stmt {} from FE: {}", params.getStmt_id(), clientAddr.getHostname()); + LOG.info("receive forwarded stmt {} from FE: {}", + params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown"); ConnectContext context = new ConnectContext(null); ConnectProcessor processor = new ConnectProcessor(context); TMasterOpResult result = processor.proxyExecute(params); @@ -1963,7 +1964,7 @@ public TImmutablePartitionResult updateImmutablePartition(TImmutablePartitionReq return result; } - public synchronized TImmutablePartitionResult updateImmutablePartitionInternal(TImmutablePartitionRequest request) + public synchronized TImmutablePartitionResult updateImmutablePartitionInternal(TImmutablePartitionRequest request) throws UserException { long dbId = request.getDb_id(); long tableId = request.getTable_id(); @@ -2100,7 +2101,7 @@ private static void buildTablets(PhysicalPartition physicalPartition, List allPartitions = dictTable.getAllPartitionIds(); response.setPartition( OlapTableSink.createPartition( - db.getId(), dictTable, dictTable.supportedAutomaticPartition(), allPartitions)); + db.getId(), dictTable, dictTable.supportedAutomaticPartition(), allPartitions)); response.setLocation(OlapTableSink.createLocation( dictTable, dictTable.getClusterId(), allPartitions, dictTable.enableReplicatedStorage())); response.setNodes_info(GlobalStateMgr.getCurrentState().createNodesInfo(dictTable.getClusterId())); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ResourceGroupAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ResourceGroupAnalyzer.java index 879e0c95f098c6..8fad763d714971 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ResourceGroupAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ResourceGroupAnalyzer.java @@ -142,8 +142,11 @@ public static ResourceGroupClassifier convertPredicateToClassifier(List testCases = ImmutableList.of( + new TestCase("[1000,infinity)", "[2, 100)"), + new TestCase("[-infinity,1000)", "[2, 100)"), + new TestCase("a [1, 10)", "[2, 100)"), new TestCase("[1, 10)", "[2, 100) b"), @@ -1155,8 +1151,6 @@ public TestCase(String planCpuCostRange, String planMemCostRange) { new TestCase("[1000,10)", "[2, 100)"), new TestCase("[1000,-1)", "[2, 100)"), - new TestCase("[1000,-infinity)", "[2, 100)"), - new TestCase("[infinity,1000)", "[2, 100)"), new TestCase("[abc, 1000)", "[2, 100)") ); @@ -1199,6 +1193,10 @@ public TestCase(String planCpuCostRange, String planMemCostRange) { } List testCases = ImmutableList.of( + new TestCase("[1000,Infinity)", "[2, 100)"), + new TestCase("[1000,NaN)", "[2, 100)"), + new TestCase("[-infinity,1000)", "[2, 100)"), + new TestCase("[1, 10]", "[2, 100)"), new TestCase("[1,1 0)", "[2, 100)"), @@ -1209,8 +1207,6 @@ public TestCase(String planCpuCostRange, String planMemCostRange) { new TestCase("[1000,10)", "[2, 100)"), new TestCase("[1000,-1)", "[2, 100)"), - new TestCase("[1000,-infinity)", "[2, 100)"), - new TestCase("[infinity,1000)", "[2, 100)"), new TestCase("[abc, 1000)", "[2, 100)") ); @@ -1224,4 +1220,145 @@ public TestCase(String planCpuCostRange, String planMemCostRange) { starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg_valid_plan_cost_range"); } } + + @Test + public void testSerializeAndDeserialize() throws Exception { + String createSQL1 = "create resource group rg1\n" + + "to (\n" + + " user='rg1_user'," + + " plan_cpu_cost_range='[1, 2)'," + + " plan_mem_cost_range='[-100, 1000)'" + + ")\n" + + " with (" + + " 'mem_limit' = '20%'," + + " 'cpu_core_limit' = '17'," + + " 'concurrency_limit' = '11'," + + " 'type' = 'normal'" + + " );"; + String createSQL2 = "create resource group rg2\n" + + "to (\n" + + " user='rg1_user'," + + " plan_mem_cost_range='[0, 2000)'" + + ")\n" + + " with (" + + " 'mem_limit' = '30%'," + + " 'cpu_core_limit' = '32'," + + " 'concurrency_limit' = '31'," + + " 'type' = 'normal'" + + " );"; + String showResult = + "rg1|17|20.0%|null|0|0|0|11|NORMAL|(weight=3.0, user=rg1_user, plan_cpu_cost_range=[1.0, 2.0), plan_mem_cost_range=[-100.0, 1000.0))\n" + + "rg2|32|30.0%|null|0|0|0|31|NORMAL|(weight=2.0, user=rg1_user, plan_mem_cost_range=[0.0, 2000.0))"; + + starRocksAssert.executeResourceGroupDdlSql(createSQL1); + starRocksAssert.executeResourceGroupDdlSql(createSQL2); + { + List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); + String actual = rowsToString(rows); + Assert.assertEquals(showResult, actual); + } + + // 1. Test serialize and deserialize ResourceGroupMgr. + try (ByteArrayOutputStream bufferOutput = new ByteArrayOutputStream()) { + try (DataOutputStream outputStream = new DataOutputStream(bufferOutput)) { + GlobalStateMgr.getCurrentState().getResourceGroupMgr().write(outputStream); + } + + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1"); + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg2"); + List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); + Assert.assertTrue(rows.isEmpty()); + + try (ByteArrayInputStream bufferInput = new ByteArrayInputStream(bufferOutput.toByteArray()); + DataInputStream inputStream = new DataInputStream(bufferInput)) { + GlobalStateMgr.getCurrentState().getResourceGroupMgr().readFields(inputStream); + } + } + { + List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); + String actual = rowsToString(rows); + Assert.assertEquals(showResult, actual); + } + + // 2. Test serialize and deserialize ResourceGroupOpEntry. + ResourceGroup rg1 = GlobalStateMgr.getCurrentState().getResourceGroupMgr().getResourceGroup("rg1"); + ResourceGroup rg2 = GlobalStateMgr.getCurrentState().getResourceGroupMgr().getResourceGroup("rg2"); + List rgs = ImmutableList.of(rg1, rg2); + for (ResourceGroup rg : rgs) { + ResourceGroupOpEntry opEntryRg = new ResourceGroupOpEntry(TWorkGroupOpType.WORKGROUP_OP_ALTER, rg); + try (ByteArrayOutputStream bufferOutput = new ByteArrayOutputStream()) { + try (DataOutputStream outputStream = new DataOutputStream(bufferOutput)) { + opEntryRg.write(outputStream); + } + + try (ByteArrayInputStream bufferInput = new ByteArrayInputStream(bufferOutput.toByteArray()); + DataInputStream inputStream = new DataInputStream(bufferInput)) { + ResourceGroupOpEntry opEntryRgRead = ResourceGroupOpEntry.read(inputStream); + assertThat(opEntryRgRead).usingRecursiveComparison().isEqualTo(opEntryRg); + } + } + } + + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1"); + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg2"); + } + + @Test + public void testClassifierOnlyWithPlanCost() throws Exception { + String createSQL1 = "create resource group rg1\n" + + "to (\n" + + " plan_cpu_cost_range='[11, 12)'," + + " plan_mem_cost_range='[-100, 11000)'" + + ")\n" + + " with (" + + " 'mem_limit' = '20%'," + + " 'cpu_core_limit' = '17'," + + " 'concurrency_limit' = '11'," + + " 'type' = 'normal'" + + " );"; + String createSQL2 = "create resource group rg2\n" + + "to (\n" + + " plan_cpu_cost_range='[21, 22)'" + + ")\n" + + " with (" + + " 'mem_limit' = '20%'," + + " 'cpu_core_limit' = '16'," + + " 'concurrency_limit' = '11'," + + " 'type' = 'normal'" + + " );"; + String createSQL3 = "create resource group rg3\n" + + "to (\n" + + " plan_mem_cost_range='[-100, 31000)'" + + ")\n" + + " with (" + + " 'mem_limit' = '20%'," + + " 'cpu_core_limit' = '17'," + + " 'concurrency_limit' = '11'," + + " 'type' = 'normal'" + + " );"; + + starRocksAssert.executeResourceGroupDdlSql(createSQL1); + starRocksAssert.executeResourceGroupDdlSql(createSQL2); + starRocksAssert.executeResourceGroupDdlSql(createSQL3); + + List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); + String actual = rowsToString(rows); + String expected = + "rg1|17|20.0%|null|0|0|0|11|NORMAL|(weight=2.0, plan_cpu_cost_range=[11.0, 12.0), plan_mem_cost_range=[-100.0, 11000.0))\n" + + "rg2|16|20.0%|null|0|0|0|11|NORMAL|(weight=1.0, plan_cpu_cost_range=[21.0, 22.0))\n" + + "rg3|17|20.0%|null|0|0|0|11|NORMAL|(weight=1.0, plan_mem_cost_range=[-100.0, 31000.0))"; + Assert.assertEquals(expected, actual); + + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1"); + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg2"); + starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg3"); + } + + @Test + public void testEmptyClassifier() { + Assert.assertThrows( + "Getting analyzing error. Detail message: At least one of ('user', 'role', 'query_type', 'db', " + + "'source_ip', 'plan_cpu_cost_range', 'plan_mem_cost_range') should be given", + SemanticException.class, () -> ResourceGroupAnalyzer.convertPredicateToClassifier(Collections.emptyList())); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/LeaderOpExecutorTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/LeaderOpExecutorTest.java new file mode 100644 index 00000000000000..9300cf1c4be321 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/qe/LeaderOpExecutorTest.java @@ -0,0 +1,119 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe; + +import com.starrocks.analysis.RedirectStatus; +import com.starrocks.common.ClientPool; +import com.starrocks.common.Config; +import com.starrocks.common.FeConstants; +import com.starrocks.pseudocluster.PseudoCluster; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.service.FrontendServiceImpl; +import com.starrocks.sql.ast.StatementBase; +import com.starrocks.thrift.FrontendService; +import com.starrocks.thrift.TMasterOpRequest; +import com.starrocks.thrift.TMasterOpResult; +import com.starrocks.thrift.TNetworkAddress; +import com.starrocks.utframe.MockGenericPool; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import org.apache.thrift.TException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class LeaderOpExecutorTest { + private static ConnectContext connectContext; + private static StarRocksAssert starRocksAssert; + private static PseudoCluster cluster; + + @BeforeClass + public static void beforeClass() throws Exception { + Config.bdbje_heartbeat_timeout_second = 60; + Config.bdbje_replica_ack_timeout_second = 60; + Config.bdbje_lock_timeout_second = 60; + // set some parameters to speedup test + Config.tablet_sched_checker_interval_seconds = 1; + Config.tablet_sched_repair_delay_factor_second = 1; + Config.enable_new_publish_mechanism = true; + PseudoCluster.getOrCreateWithRandomPort(true, 1); + GlobalStateMgr.getCurrentState().getTabletChecker().setInterval(1000); + cluster = PseudoCluster.getInstance(); + + FeConstants.runningUnitTest = true; + Config.alter_scheduler_interval_millisecond = 100; + Config.dynamic_partition_enable = true; + Config.dynamic_partition_check_interval_seconds = 1; + Config.enable_experimental_mv = true; + // create connect context + connectContext = UtFrameUtils.createDefaultCtx(); + starRocksAssert = new StarRocksAssert(connectContext); + + starRocksAssert.withDatabase("d1").useDatabase("d1") + .withTable( + "CREATE TABLE d1.t1(k1 int, k2 int, k3 int)" + + " distributed by hash(k1) buckets 3 properties('replication_num' = '1');") + .withTable( + "CREATE TABLE d1.t2(k1 int, k2 int, k3 int)" + + " distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + } + + @Test + public void testResourceGroupNameInAuditLog() throws Exception { + + String createGroup = "create resource group rg1\n" + + "to\n" + + " (db='d1')\n" + + "with (\n" + + " 'cpu_core_limit' = '1',\n" + + " 'mem_limit' = '50%',\n" + + " 'concurrency_limit' = '20',\n" + + " 'type' = 'normal'\n" + + ");"; + cluster.runSql("d1", createGroup); + + String sql = "insert into t1 select * from t1"; + StatementBase stmtBase = UtFrameUtils.parseStmtWithNewParser(sql, connectContext); + LeaderOpExecutor executor = + new LeaderOpExecutor(stmtBase, stmtBase.getOrigStmt(), connectContext, RedirectStatus.FORWARD_NO_SYNC); + + mockFrontendService(new MockFrontendServiceClient()); + executor.execute(); + + Assert.assertEquals("rg1", connectContext.getAuditEventBuilder().build().resourceGroup); + } + + private static class MockFrontendServiceClient extends FrontendService.Client { + private final FrontendService.Iface frontendService = new FrontendServiceImpl(null); + + public MockFrontendServiceClient() { + super(null); + } + + @Override + public TMasterOpResult forward(TMasterOpRequest params) throws TException { + return frontendService.forward(params); + } + } + + private static void mockFrontendService(MockFrontendServiceClient client) { + ClientPool.frontendPool = new MockGenericPool("leader-op-mocked-pool") { + @Override + public FrontendService.Client borrowObject(TNetworkAddress address, int timeoutMs) { + return client; + } + }; + } +} diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 71c4a589b072d0..02317c27620a96 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -788,6 +788,8 @@ struct TMasterOpResult { 4: optional string state; // for query statement 5: optional list channelBufferList; + + 6: optional string resource_group_name; } struct TIsMethodSupportedRequest {