Skip to content

Commit

Permalink
[Enhancement] Avoid checking group concurrency_limit when enabling gr…
Browse files Browse the repository at this point in the history
…oup level queue (backport #34398) (#34479) (#39027)

Signed-off-by: zihe.liu <ziheliu1024@gmail.com>
  • Loading branch information
luohaha authored Jan 13, 2024
1 parent 2b81a7b commit f4095e9
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 14 deletions.
11 changes: 10 additions & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,16 @@ Status FragmentExecutor::_prepare_workgroup(const UnifiedExecPlanFragmentParams&
wg = WorkGroupManager::instance()->add_workgroup(wg);
}
DCHECK(wg != nullptr);
RETURN_IF_ERROR(_query_ctx->init_query_once(wg.get()));

const auto& query_options = request.common().query_options;
bool enable_group_level_query_queue = false;
if (query_options.__isset.query_queue_options) {
const auto& queue_options = query_options.query_queue_options;
enable_group_level_query_queue =
queue_options.__isset.enable_group_level_query_queue && queue_options.enable_group_level_query_queue;
}
RETURN_IF_ERROR(_query_ctx->init_query_once(wg.get(), enable_group_level_query_queue));

_fragment_ctx->set_workgroup(wg);
_wg = wg;

Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent,
});
}

Status QueryContext::init_query_once(workgroup::WorkGroup* wg) {
Status QueryContext::init_query_once(workgroup::WorkGroup* wg, bool enable_group_level_query_queue) {
Status st = Status::OK();
if (wg != nullptr) {
std::call_once(_init_query_once, [this, &st, wg]() {
std::call_once(_init_query_once, [this, &st, wg, enable_group_level_query_queue]() {
this->init_query_begin_time();
auto maybe_token = wg->acquire_running_query_token();
auto maybe_token = wg->acquire_running_query_token(enable_group_level_query_queue);
if (maybe_token.ok()) {
_wg_running_query_token_ptr = std::move(maybe_token.value());
_wg_running_query_token_atomic_ptr = _wg_running_query_token_ptr.get();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
workgroup::WorkGroup* wg = nullptr);
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }

Status init_query_once(workgroup::WorkGroup* wg);
Status init_query_once(workgroup::WorkGroup* wg, bool enable_group_level_query_queue);
/// Release the workgroup token only once to avoid double-free.
/// This method should only be invoked while the QueryContext is still valid,
/// to avoid double-free between the destruction and this method.
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,10 @@ void WorkGroup::decr_num_running_drivers() {
}
}

StatusOr<RunningQueryTokenPtr> WorkGroup::acquire_running_query_token() {
StatusOr<RunningQueryTokenPtr> WorkGroup::acquire_running_query_token(bool enable_group_level_query_queue) {
int64_t old = _num_running_queries.fetch_add(1);
if (_concurrency_limit != ABSENT_CONCURRENCY_LIMIT && old >= _concurrency_limit) {
if (!enable_group_level_query_queue && _concurrency_limit != ABSENT_CONCURRENCY_LIMIT &&
old >= _concurrency_limit) {
_num_running_queries.fetch_sub(1);
_concurrency_overflow_count++;
return Status::TooManyTasks(fmt::format("Exceed concurrency limit: {}", _concurrency_limit));
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/workgroup/work_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
static int128_t create_unique_id(int64_t id, int64_t version) { return (((int128_t)version) << 64) | id; }

Status check_big_query(const QueryContext& query_context);
StatusOr<RunningQueryTokenPtr> acquire_running_query_token();
StatusOr<RunningQueryTokenPtr> acquire_running_query_token(bool enable_group_level_query_queue);
void decr_num_queries();
int64_t num_running_queries() const { return _num_running_queries; }
int64_t num_total_queries() const { return _num_total_queries; }
Expand Down
10 changes: 5 additions & 5 deletions be/test/exec/pipeline/query_context_manger_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ TEST(QueryContextManagerTest, testSetWorkgroup) {
const auto query_id1 = query_ctx1->query_id();

/// Case 1: When all the fragments have come and finished, wg.num_running_queries should become to zero.
ASSERT_OK(query_ctx1->init_query_once(wg.get()));
ASSERT_OK(query_ctx1->init_query_once(wg.get())); // None-first invocations have no side-effects.
ASSERT_ERROR(query_ctx_overloaded->init_query_once(wg.get())); // Exceed concurrency_limit.
ASSERT_OK(query_ctx1->init_query_once(wg.get(), false));
ASSERT_OK(query_ctx1->init_query_once(wg.get(), false)); // None-first invocations have no side-effects.
ASSERT_ERROR(query_ctx_overloaded->init_query_once(wg.get(), false)); // Exceed concurrency_limit.
ASSERT_EQ(1, wg->num_running_queries());
ASSERT_EQ(1, wg->concurrency_overflow_count());
// All the fragments comes.
Expand All @@ -276,8 +276,8 @@ TEST(QueryContextManagerTest, testSetWorkgroup) {
auto* query_ctx2 =
gen_query_ctx(parent_mem_tracker.get(), query_ctx_mgr.get(), 3, 4, 3, 0 /* delivery_timeout */, 300);
const auto query_id2 = query_ctx2->query_id();
ASSERT_OK(query_ctx2->init_query_once(wg.get()));
ASSERT_OK(query_ctx2->init_query_once(wg.get())); // None-first invocations have no side-effects.
ASSERT_OK(query_ctx2->init_query_once(wg.get(), false));
ASSERT_OK(query_ctx2->init_query_once(wg.get(), false)); // None-first invocations have no side-effects.
ASSERT_EQ(1, wg->num_running_queries());
for (int i = 2; i < query_ctx2->total_fragments(); ++i) {
auto* cur_query_ctx = query_ctx_mgr->get_or_register(query_id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import com.starrocks.thrift.TPlanFragmentExecParams;
import com.starrocks.thrift.TQueryGlobals;
import com.starrocks.thrift.TQueryOptions;
import com.starrocks.thrift.TQueryQueueOptions;
import com.starrocks.thrift.TQueryType;
import com.starrocks.thrift.TRuntimeFilterParams;
import com.starrocks.thrift.TScanRangeLocation;
Expand Down Expand Up @@ -173,6 +174,10 @@ public class CoordinatorPreprocessor {
// Resource group
private final TWorkGroup resourceGroup;

private boolean enableQueue = false;
private boolean needCheckQueued = false;
private boolean enableGroupLevelQueue = false;

public CoordinatorPreprocessor(TUniqueId queryId, ConnectContext context, List<PlanFragment> fragments,
List<ScanNode> scanNodes, TDescriptorTable descriptorTable,
TQueryGlobals queryGlobals, TQueryOptions queryOptions) {
Expand Down Expand Up @@ -377,6 +382,30 @@ public TWorkGroup getResourceGroup() {
return resourceGroup;
}

public boolean isEnableQueue() {
return enableQueue;
}

public void setEnableQueue(boolean enableQueue) {
this.enableQueue = enableQueue;
}

public boolean isNeedCheckQueued() {
return needCheckQueued;
}

public void setNeedCheckQueued(boolean needCheckQueued) {
this.needCheckQueued = needCheckQueued;
}

public boolean isEnableGroupLevelQueue() {
return enableGroupLevelQueue;
}

public void setEnableGroupLevelQueue(boolean enableGroupLevelQueue) {
this.enableGroupLevelQueue = enableGroupLevelQueue;
}

public Map<TNetworkAddress, Integer> getHostToNumbers() {
return hostToNumbers;
}
Expand Down Expand Up @@ -1693,6 +1722,14 @@ private void toThriftForCommonParams(TExecPlanFragmentParams commonParams,
commonParams.adaptive_dop_param.setMax_output_amplification_factor(
sessionVariable.getAdaptiveDopMaxOutputAmplificationFactor());
}
if (isEnableQueue()) {
TQueryQueueOptions queryQueueOptions = new TQueryQueueOptions();
queryQueueOptions.setEnable_global_query_queue(isEnableQueue());
queryQueueOptions.setEnable_group_level_query_queue(isEnableGroupLevelQueue());

TQueryOptions queryOptions = commonParams.getQuery_options();
queryOptions.setQuery_queue_options(queryQueueOptions);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public static QueryQueueManager getInstance() {
}

public void maybeWait(ConnectContext context, Coordinator coord) throws UserException, InterruptedException {
if (!needCheckQueue(coord) || !isEnableQueue(coord)) {
CoordinatorPreprocessor coordPrepare = coord.getPrepareInfo();
coordPrepare.setNeedCheckQueued(needCheckQueue(coord));
coordPrepare.setEnableQueue(isEnableQueue(coord));
coordPrepare.setEnableGroupLevelQueue(coordPrepare.isEnableQueue() && GlobalVariable.isEnableGroupLevelQueryQueue());

if (!coordPrepare.isNeedCheckQueued() || !coordPrepare.isEnableQueue()) {
return;
}

Expand Down
12 changes: 12 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ enum TOverflowMode {
REPORT_ERROR = 1;
}

struct TQueryQueueOptions {
1: optional bool enable_global_query_queue;
2: optional bool enable_group_level_query_queue;
}

// Query options with their respective defaults
struct TQueryOptions {
2: optional i32 max_errors = 0
Expand Down Expand Up @@ -222,6 +227,13 @@ struct TQueryOptions {

104: optional TOverflowMode overflow_mode = TOverflowMode.OUTPUT_NULL;
105: optional bool use_column_pool = true;

106: optional bool enable_agg_spill_preaggregation;
107: optional i64 global_runtime_filter_build_max_size;
108: optional i64 runtime_filter_rpc_http_min_size;
109: optional i64 big_query_profile_second_threshold;

110: optional TQueryQueueOptions query_queue_options;
}


Expand Down

0 comments on commit f4095e9

Please sign in to comment.