Skip to content

Commit

Permalink
[Enhancement] rename realtime resource group to short_query (#10222) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ZiheLiu authored Aug 20, 2022
1 parent 073c7f4 commit cf46ea4
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 51 deletions.
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/pipeline_driver_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void WorkGroupDriverQueue::update_statistics(const DriverRawPtr driver) {

// Update bandwidth control information.
_update_bandwidth_control_period();
if (!wg_entity->is_rt_wg()) {
if (!wg_entity->is_sq_wg()) {
_bandwidth_usage_ns += runtime_ns;
}

Expand Down Expand Up @@ -309,10 +309,10 @@ bool WorkGroupDriverQueue::should_yield(const DriverRawPtr driver, int64_t unacc

bool WorkGroupDriverQueue::_throttled(const workgroup::WorkGroupDriverSchedEntity* wg_entity,
int64_t unaccounted_runtime_ns) const {
if (wg_entity->is_rt_wg()) {
if (wg_entity->is_sq_wg()) {
return false;
}
if (!workgroup::WorkGroupManager::instance()->is_rt_wg_running()) {
if (!workgroup::WorkGroupManager::instance()->is_sq_wg_running()) {
return false;
}

Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/pipeline/pipeline_driver_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class WorkGroupDriverQueue : public FactoryMethod<DriverQueue, WorkGroupDriverQu
workgroup::WorkGroupDriverSchedEntity* _take_next_wg();
// _update_min_wg is invoked when an entity is enqueued or dequeued from _wg_entities.
void _update_min_wg();
// Apply hard bandwidth control to non-realtime workgroups, when there are queries of the realtime workgroup.
// Apply hard bandwidth control to non-short-query workgroups, when there are queries of the short-query workgroup.
bool _throttled(const workgroup::WorkGroupDriverSchedEntity* wg_entity, int64_t unaccounted_runtime_ns = 0) const;
// _update_bandwidth_control_period resets period_end_ns and period_usage_ns, when a new period comes.
// It is invoked when taking a task to execute or an executed task is finished.
Expand Down Expand Up @@ -206,15 +206,15 @@ class WorkGroupDriverQueue : public FactoryMethod<DriverQueue, WorkGroupDriverQu
std::atomic<int64_t> _min_vruntime_ns = std::numeric_limits<int64_t>::max();
std::atomic<workgroup::WorkGroupDriverSchedEntity*> _min_wg_entity = nullptr;

// Hard bandwidth control to non-realtime workgroups.
// - The control period is 100ms, and the total quota of non-realtime workgroups is 100ms*(vCPUs-rt_wg.cpu_limit).
// - The non-realtime workgroups cannot be executed in the current period, if their usage exceeds quota.
// - When a new period comes, penalize the non-realtime workgroups according to the previous bandwidth usage.
// Hard bandwidth control to non-short-query workgroups.
// - The control period is 100ms, and the total quota of non-short-query workgroups is 100ms*(vCPUs-rt_wg.cpu_limit).
// - The non-short-query workgroups cannot be executed in the current period, if their usage exceeds quota.
// - When a new period comes, penalize the non-short-query workgroups according to the previous bandwidth usage.
// - If usage <= quota, don't penalize it.
// - If quota < usage <= 2*quota, set the new usage to `usage-quota`.
// - Otherwise, set the new usage to quota to prevent them from being executed in the new period.
// Whether to apply the bandwidth control is decided by whether there are queries of the realtime workgroup.
// - If there are queries of the realtime workgroup, apply the control.
// Whether to apply the bandwidth control is decided by whether there are queries of the short-query workgroup.
// - If there are queries of the short-query workgroup, apply the control.
// - Otherwise, don't apply the control.
int64_t _bandwidth_control_period_end_ns = 0;
std::atomic<int64_t> _bandwidth_usage_ns = 0;
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/workgroup/scan_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void WorkGroupScanTaskQueue::update_statistics(WorkGroup* wg, int64_t runtime_ns

// Update bandwidth control information.
_update_bandwidth_control_period();
if (!wg_entity->is_rt_wg()) {
if (!wg_entity->is_sq_wg()) {
_bandwidth_usage_ns += runtime_ns;
}

Expand Down Expand Up @@ -130,10 +130,10 @@ bool WorkGroupScanTaskQueue::should_yield(const WorkGroup* wg, int64_t unaccount

bool WorkGroupScanTaskQueue::_throttled(const workgroup::WorkGroupScanSchedEntity* wg_entity,
int64_t unaccounted_runtime_ns) const {
if (wg_entity->is_rt_wg()) {
if (wg_entity->is_sq_wg()) {
return false;
}
if (!workgroup::WorkGroupManager::instance()->is_rt_wg_running()) {
if (!workgroup::WorkGroupManager::instance()->is_sq_wg_running()) {
return false;
}

Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/workgroup/scan_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class WorkGroupScanTaskQueue final : public ScanTaskQueue {
workgroup::WorkGroupScanSchedEntity* _take_next_wg();
// _update_min_wg is invoked when an entity is enqueued or dequeued from _wg_entities.
void _update_min_wg();
// Apply hard bandwidth control to non-realtime workgroups, when there are queries of the realtime workgroup.
// Apply hard bandwidth control to non-short-query workgroups, when there are queries of the short-query workgroup.
bool _throttled(const workgroup::WorkGroupScanSchedEntity* wg_entity, int64_t unaccounted_runtime_ns = 0) const;
// _update_bandwidth_control_period resets period_end_ns and period_usage_ns, when a new period comes.
// It is invoked when taking a task to execute or an executed task is finished.
Expand Down Expand Up @@ -137,15 +137,15 @@ class WorkGroupScanTaskQueue final : public ScanTaskQueue {
std::atomic<int64_t> _min_vruntime_ns = std::numeric_limits<int64_t>::max();
std::atomic<workgroup::WorkGroupScanSchedEntity*> _min_wg_entity = nullptr;

// Hard bandwidth control to non-realtime workgroups.
// - The control period is 100ms, and the total quota of non-realtime workgroups is 100ms*(vCPUs-rt_wg.cpu_limit).
// - The non-realtime workgroups cannot be executed in the current period, if their usage exceeds quota.
// - When a new period comes, penalize the non-realtime workgroups according to the previous bandwidth usage.
// Hard bandwidth control to non-short-query workgroups.
// - The control period is 100ms, and the total quota of non-short-query workgroups is 100ms*(vCPUs-rt_wg.cpu_limit).
// - The non-short-query workgroups cannot be executed in the current period, if their usage exceeds quota.
// - When a new period comes, penalize the non-short-query workgroups according to the previous bandwidth usage.
// - If usage <= quota, don't penalize it.
// - If quota < usage <= 2*quota, set the new usage to `usage-quota`.
// - Otherwise, set the new usage to quota to prevent them from being executed in the new period.
// Whether to apply the bandwidth control is decided by whether there are queries of the realtime workgroup.
// - If there are queries of the realtime workgroup, apply the control.
// Whether to apply the bandwidth control is decided by whether there are queries of the short-query workgroup.
// - If there are queries of the short-query workgroup, apply the control.
// - Otherwise, don't apply the control.
int64_t _bandwidth_control_period_end_ns = 0;
std::atomic<int64_t> _bandwidth_usage_ns = 0;
Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ int64_t WorkGroupSchedEntity<Q>::cpu_limit() const {
}

template <typename Q>
bool WorkGroupSchedEntity<Q>::is_rt_wg() const {
return _workgroup->is_rt_wg();
bool WorkGroupSchedEntity<Q>::is_sq_wg() const {
return _workgroup->is_sq_wg();
}

template class WorkGroupSchedEntity<pipeline::DriverQueue>;
Expand Down Expand Up @@ -129,17 +129,17 @@ void WorkGroup::incr_num_running_drivers() {
++_num_running_drivers;
++_acc_num_drivers;

if (is_rt_wg()) {
WorkGroupManager::instance()->incr_num_running_rt_drivers();
if (is_sq_wg()) {
WorkGroupManager::instance()->incr_num_running_sq_drivers();
}
}

void WorkGroup::decr_num_running_drivers() {
int64_t old = _num_running_drivers.fetch_sub(1);
DCHECK_GT(old, 0);

if (is_rt_wg()) {
WorkGroupManager::instance()->decr_num_running_rt_drivers();
if (is_sq_wg()) {
WorkGroupManager::instance()->decr_num_running_sq_drivers();
}
}

Expand Down Expand Up @@ -386,7 +386,7 @@ void WorkGroupManager::create_workgroup_unlocked(const WorkGroupPtr& wg, UniqueL
_workgroups[unique_id] = wg;

_sum_cpu_limit += wg->cpu_limit();
if (wg->is_rt_wg()) {
if (wg->is_sq_wg()) {
_rt_cpu_limit = wg->cpu_limit();
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/workgroup/work_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class WorkGroupSchedEntity {
void set_in_queue(Q* in_queue) { _in_queue = in_queue; }

int64_t cpu_limit() const;
bool is_rt_wg() const;
bool is_sq_wg() const;

int64_t vruntime_ns() const { return _vruntime_ns; }
int64_t runtime_ns() const { return _vruntime_ns * cpu_limit(); }
Expand Down Expand Up @@ -97,7 +97,7 @@ class WorkGroup {
size_t mem_limit() const { return _memory_limit; }
int64_t mem_limit_bytes() const { return _memory_limit_bytes; }

bool is_rt_wg() const { return _type == WorkGroupType::WG_REALTIME; }
bool is_sq_wg() const { return _type == WorkGroupType::WG_SHORT_QUERY; }

WorkGroupDriverSchedEntity* driver_sched_entity() { return &_driver_sched_entity; }
const WorkGroupDriverSchedEntity* driver_sched_entity() const { return &_driver_sched_entity; }
Expand Down Expand Up @@ -208,9 +208,9 @@ class WorkGroupManager {
std::vector<TWorkGroup> list_workgroups();
std::vector<TWorkGroup> list_all_workgroups();

void incr_num_running_rt_drivers() { _num_running_rt_drivers++; }
void decr_num_running_rt_drivers() { _num_running_rt_drivers--; }
bool is_rt_wg_running() const { return _num_running_rt_drivers > 0; }
void incr_num_running_sq_drivers() { _num_running_sq_drivers++; }
void decr_num_running_sq_drivers() { _num_running_sq_drivers--; }
bool is_sq_wg_running() const { return _num_running_sq_drivers > 0; }
size_t normal_workgroup_cpu_hard_limit() const;

void update_metrics();
Expand All @@ -234,7 +234,7 @@ class WorkGroupManager {
std::unordered_map<int64_t, int64_t> _workgroup_versions;
std::list<int128_t> _workgroup_expired_versions;

std::atomic<size_t> _num_running_rt_drivers = 0;
std::atomic<size_t> _num_running_sq_drivers = 0;
std::atomic<size_t> _sum_cpu_limit = 0;
std::atomic<size_t> _rt_cpu_limit = 0;

Expand Down
17 changes: 9 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/catalog/WorkGroupMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public class WorkGroupMgr implements Writable {
private Map<String, WorkGroup> workGroupMap = new HashMap<>();
private Map<Long, WorkGroup> id2WorkGroupMap = new HashMap<>();

// Record the current realtime resource group.
// Record the current short_query resource group.
// There can be only one realtime resource group.
private WorkGroup rtResourceGroup = null;
private WorkGroup shortQueryResourceGroup = null;

private Map<Long, WorkGroupClassifier> classifierMap = new HashMap<>();
private List<TWorkGroupOp> workGroupOps = new ArrayList<>();
Expand Down Expand Up @@ -95,9 +95,10 @@ public void createWorkGroup(CreateWorkGroupStmt stmt) throws DdlException {
}
}

if (wg.getWorkGroupType() == TWorkGroupType.WG_REALTIME && rtResourceGroup != null) {
if (wg.getWorkGroupType() == TWorkGroupType.WG_SHORT_QUERY && shortQueryResourceGroup != null) {
throw new DdlException(
String.format("There can be only one realtime RESOURCE_GROUP (%s)", rtResourceGroup.getName()));
String.format("There can be only one short_query RESOURCE_GROUP (%s)",
shortQueryResourceGroup.getName()));
}

wg.setId(globalStateMgr.getCurrentState().getNextId());
Expand Down Expand Up @@ -372,8 +373,8 @@ private void removeWorkGroupInternal(String name) {
for (WorkGroupClassifier classifier : wg.classifiers) {
classifierMap.remove(classifier.getId());
}
if (wg.getWorkGroupType() == TWorkGroupType.WG_REALTIME) {
rtResourceGroup = null;
if (wg.getWorkGroupType() == TWorkGroupType.WG_SHORT_QUERY) {
shortQueryResourceGroup = null;
}
}

Expand All @@ -383,8 +384,8 @@ private void addWorkGroupInternal(WorkGroup wg) {
for (WorkGroupClassifier classifier : wg.classifiers) {
classifierMap.put(classifier.getId(), classifier);
}
if (wg.getWorkGroupType() == TWorkGroupType.WG_REALTIME) {
rtResourceGroup = wg;
if (wg.getWorkGroupType() == TWorkGroupType.WG_SHORT_QUERY) {
shortQueryResourceGroup = wg;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ public static void analyzeProperties(WorkGroup workgroup, Map<String, String> pr
try {
workgroup.setWorkGroupType(TWorkGroupType.valueOf("WG_" + value.toUpperCase()));
if (workgroup.getWorkGroupType() != TWorkGroupType.WG_NORMAL &&
workgroup.getWorkGroupType() != TWorkGroupType.WG_REALTIME) {
throw new SemanticException("Only support 'normal' and 'realtime' type");
workgroup.getWorkGroupType() != TWorkGroupType.WG_SHORT_QUERY) {
throw new SemanticException("Only support 'normal' and 'short_query' type");
}
} catch (Exception ignored) {
throw new SemanticException("Only support 'normal' and 'realtime' type");
throw new SemanticException("Only support 'normal' and 'short_query' type");
}
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void testCreateDResourceGroupWithIllegalProperty() throws Exception {
" 'concurrency_limit' = '11',\n" +
" 'type' = 'illegal-type'" +
");";
Assert.assertThrows("Only support 'normal' and 'realtime' type",
Assert.assertThrows("Only support 'normal' and 'short_query' type",
SemanticException.class, () -> starRocksAssert.executeWorkGroupDdlSql(illegalTypeSql));

String illegalDefaultTypeSql = "create resource group rg_unknown\n" +
Expand All @@ -280,7 +280,7 @@ public void testCreateDResourceGroupWithIllegalProperty() throws Exception {
" 'concurrency_limit' = '11',\n" +
" 'type' = 'default'" +
");";
Assert.assertThrows("Only support 'normal' and 'realtime' type",
Assert.assertThrows("Only support 'normal' and 'short_query' type",
SemanticException.class, () -> starRocksAssert.executeWorkGroupDdlSql(illegalDefaultTypeSql));
}

Expand Down Expand Up @@ -578,15 +578,15 @@ public void testIllegalClassifier() {
}

@Test
public void testRealtimeResourceGroup() throws Exception {
public void testShortQueryResourceGroup() throws Exception {
String createRtRg1ReplaceSql = "create resource group if not exists or replace rg1\n" +
"to\n" +
" (`db`='db1')\n" +
"with (\n" +
" 'cpu_core_limit' = '25',\n" +
" 'mem_limit' = '80%',\n" +
" 'concurrency_limit' = '10',\n" +
" 'type' = 'realtime'\n" +
" 'type' = 'short_query'\n" +
");";

String createNormalRg1ReplaceSql = "create resource group if not exists or replace rg1\n" +
Expand All @@ -606,7 +606,7 @@ public void testRealtimeResourceGroup() throws Exception {
" 'cpu_core_limit' = '25',\n" +
" 'mem_limit' = '80%',\n" +
" 'concurrency_limit' = '10',\n" +
" 'type' = 'realtime'\n" +
" 'type' = 'short_query'\n" +
");";

String createNormalRg2ReplaceSql = "create resource group if not exists or replace rg2\n" +
Expand All @@ -631,7 +631,7 @@ public void testRealtimeResourceGroup() throws Exception {

// Create normal rg2 and fail to replace it with realtime rg2.
starRocksAssert.executeWorkGroupDdlSql(createNormalRg2ReplaceSql);
Assert.assertThrows("There can be only one realtime RESOURCE_GROUP (rg1)",
Assert.assertThrows("There can be only one short_query RESOURCE_GROUP (rg1)",
DdlException.class, () -> starRocksAssert.executeWorkGroupDdlSql(createRtRg2ReplaceSql));

// Replace realtime rg1 with normal rg1, and create realtime rg2.
Expand Down
4 changes: 3 additions & 1 deletion gensrc/thrift/WorkGroup.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ namespace cpp starrocks
namespace java com.starrocks.thrift

enum TWorkGroupType {
// Not suppported.
WG_REALTIME,
WG_NORMAL,
WG_DEFAULT
WG_DEFAULT,
WG_SHORT_QUERY
}

struct TWorkGroup {
Expand Down

0 comments on commit cf46ea4

Please sign in to comment.