Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Avoid checking group concurrency_limit when enabling group level queue #34398

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,16 @@ Status FragmentExecutor::_prepare_workgroup(const UnifiedExecPlanFragmentParams&
wg = WorkGroupManager::instance()->add_workgroup(wg);
}
DCHECK(wg != nullptr);
RETURN_IF_ERROR(_query_ctx->init_query_once(wg.get()));

const auto& query_options = request.common().query_options;
bool enable_group_level_query_queue = false;
if (query_options.__isset.query_queue_options) {
const auto& queue_options = query_options.query_queue_options;
enable_group_level_query_queue =
queue_options.__isset.enable_group_level_query_queue && queue_options.enable_group_level_query_queue;
}
RETURN_IF_ERROR(_query_ctx->init_query_once(wg.get(), enable_group_level_query_queue));

_fragment_ctx->set_workgroup(wg);
_wg = wg;

ZiheLiu marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent,
});
}

Status QueryContext::init_query_once(workgroup::WorkGroup* wg) {
Status QueryContext::init_query_once(workgroup::WorkGroup* wg, bool enable_group_level_query_queue) {
Status st = Status::OK();
if (wg != nullptr) {
std::call_once(_init_query_once, [this, &st, wg]() {
std::call_once(_init_query_once, [this, &st, wg, enable_group_level_query_queue]() {
this->init_query_begin_time();
auto maybe_token = wg->acquire_running_query_token();
auto maybe_token = wg->acquire_running_query_token(enable_group_level_query_queue);
if (maybe_token.ok()) {
_wg_running_query_token_ptr = std::move(maybe_token.value());
_wg_running_query_token_atomic_ptr = _wg_running_query_token_ptr.get();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
workgroup::WorkGroup* wg = nullptr);
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }

Status init_query_once(workgroup::WorkGroup* wg);
Status init_query_once(workgroup::WorkGroup* wg, bool enable_group_level_query_queue);
/// Release the workgroup token only once to avoid double-free.
/// This method should only be invoked while the QueryContext is still valid,
/// to avoid double-free between the destruction and this method.
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,10 @@ void WorkGroup::decr_num_running_drivers() {
}
}

StatusOr<RunningQueryTokenPtr> WorkGroup::acquire_running_query_token() {
StatusOr<RunningQueryTokenPtr> WorkGroup::acquire_running_query_token(bool enable_group_level_query_queue) {
int64_t old = _num_running_queries.fetch_add(1);
if (_concurrency_limit != ABSENT_CONCURRENCY_LIMIT && old >= _concurrency_limit) {
if (!enable_group_level_query_queue && _concurrency_limit != ABSENT_CONCURRENCY_LIMIT &&
old >= _concurrency_limit) {
_num_running_queries.fetch_sub(1);
_concurrency_overflow_count++;
return Status::TooManyTasks(fmt::format("Exceed concurrency limit: {}", _concurrency_limit));
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/workgroup/work_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
static int128_t create_unique_id(int64_t id, int64_t version) { return (((int128_t)version) << 64) | id; }

Status check_big_query(const QueryContext& query_context);
StatusOr<RunningQueryTokenPtr> acquire_running_query_token();
StatusOr<RunningQueryTokenPtr> acquire_running_query_token(bool enable_group_level_query_queue);
void decr_num_queries();
int64_t num_running_queries() const { return _num_running_queries; }
int64_t num_total_queries() const { return _num_total_queries; }
Expand Down
10 changes: 5 additions & 5 deletions be/test/exec/pipeline/query_context_manger_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ TEST(QueryContextManagerTest, testSetWorkgroup) {
const auto query_id1 = query_ctx1->query_id();

/// Case 1: When all the fragments have come and finished, wg.num_running_queries should become to zero.
ASSERT_OK(query_ctx1->init_query_once(wg.get()));
ASSERT_OK(query_ctx1->init_query_once(wg.get())); // None-first invocations have no side-effects.
ASSERT_ERROR(query_ctx_overloaded->init_query_once(wg.get())); // Exceed concurrency_limit.
ASSERT_OK(query_ctx1->init_query_once(wg.get(), false));
ASSERT_OK(query_ctx1->init_query_once(wg.get(), false)); // None-first invocations have no side-effects.
ASSERT_ERROR(query_ctx_overloaded->init_query_once(wg.get(), false)); // Exceed concurrency_limit.
ASSERT_EQ(1, wg->num_running_queries());
ASSERT_EQ(1, wg->concurrency_overflow_count());
// All the fragments comes.
Expand All @@ -276,8 +276,8 @@ TEST(QueryContextManagerTest, testSetWorkgroup) {
auto* query_ctx2 =
gen_query_ctx(parent_mem_tracker.get(), query_ctx_mgr.get(), 3, 4, 3, 0 /* delivery_timeout */, 300);
const auto query_id2 = query_ctx2->query_id();
ASSERT_OK(query_ctx2->init_query_once(wg.get()));
ASSERT_OK(query_ctx2->init_query_once(wg.get())); // None-first invocations have no side-effects.
ASSERT_OK(query_ctx2->init_query_once(wg.get(), false));
ASSERT_OK(query_ctx2->init_query_once(wg.get(), false)); // None-first invocations have no side-effects.
ASSERT_EQ(1, wg->num_running_queries());
for (int i = 2; i < query_ctx2->total_fragments(); ++i) {
auto* cur_query_ctx = query_ctx_mgr->get_or_register(query_id2);
Expand Down
31 changes: 3 additions & 28 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryQueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
import com.starrocks.common.UserException;
import com.starrocks.metric.MetricRepo;
import com.starrocks.metric.ResourceGroupMetricMgr;
import com.starrocks.planner.ScanNode;
import com.starrocks.planner.SchemaScanNode;
import com.starrocks.qe.scheduler.RecoverableException;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.qe.scheduler.slot.LogicalSlot;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Frontend;
import com.starrocks.thrift.TWorkGroup;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -51,7 +49,8 @@ public static QueryQueueManager getInstance() {
}

public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws UserException, InterruptedException {
if (!needCheckQueue(coord) || !isEnableQueue(coord)) {
JobSpec jobSpec = coord.getJobSpec();
if (!jobSpec.isNeedQueued() || !jobSpec.isEnableQueue()) {
return;
}

Expand Down Expand Up @@ -109,30 +108,6 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U
}
}

public boolean isEnableQueue(DefaultCoordinator coord) {
if (coord.getJobSpec().isStatisticsJob()) {
return GlobalVariable.isEnableQueryQueueStatistic();
}

if (coord.isLoadType()) {
return GlobalVariable.isEnableQueryQueueLoad();
}

return GlobalVariable.isEnableQueryQueueSelect();
}

public boolean needCheckQueue(DefaultCoordinator coord) {
if (!coord.getJobSpec().isNeedQueued()) {
return false;
}

// The queries only using schema meta will never been queued, because a MySQL client will
// query schema meta after the connection is established.
List<ScanNode> scanNodes = coord.getScanNodes();
boolean notNeed = scanNodes.isEmpty() || scanNodes.stream().allMatch(SchemaScanNode.class::isInstance);
return !notNeed;
}

private LogicalSlot createSlot(DefaultCoordinator coord) throws UserException {
Pair<String, Integer> selfIpAndPort = GlobalStateMgr.getCurrentState().getNodeMgr().getSelfIpAndRpcPort();
Frontend frontend = GlobalStateMgr.getCurrentState().getFeByHost(selfIpAndPort.first);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.starrocks.thrift.TPlanFragmentDestination;
import com.starrocks.thrift.TPlanFragmentExecParams;
import com.starrocks.thrift.TQueryOptions;
import com.starrocks.thrift.TQueryQueueOptions;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -150,6 +151,14 @@ public void toThriftFromCommonParams(TExecPlanFragmentParams result,
result.adaptive_dop_param.setMax_output_amplification_factor(
sessionVariable.getAdaptiveDopMaxOutputAmplificationFactor());
}
if (jobSpec.isEnableQueue()) {
TQueryQueueOptions queryQueueOptions = new TQueryQueueOptions();
queryQueueOptions.setEnable_global_query_queue(jobSpec.isEnableQueue());
queryQueueOptions.setEnable_group_level_query_queue(jobSpec.isEnableGroupLevelQueue());

TQueryOptions queryOptions = result.getQuery_options();
queryOptions.setQuery_queue_options(queryQueueOptions);
}
}
}
}
Expand Down
Loading