Skip to content

Commit

Permalink
[improvement](pipeline) task group scan entity (apache#19924) (apache…
Browse files Browse the repository at this point in the history
…#27040)

Co-authored-by: Lijia Liu <liutang123@yeah.net>
  • Loading branch information
2 people authored and gnehil committed Dec 4, 2023
1 parent 4a1aa68 commit 6f125b6
Show file tree
Hide file tree
Showing 26 changed files with 592 additions and 139 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ DEFINE_Bool(enable_fuzzy_mode, "false");
DEFINE_Bool(enable_debug_points, "false");

DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_mInt16(pipeline_short_query_timeout_s, "20");
DEFINE_mBool(enable_workload_group_for_scan, "false");

// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
// Will remove after fully test.
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ DECLARE_Bool(enable_fuzzy_mode);
DECLARE_Bool(enable_debug_points);

DECLARE_Int32(pipeline_executor_size);
DECLARE_mInt16(pipeline_short_query_timeout_s);
DECLARE_mBool(enable_workload_group_for_scan);

// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
// Will remove after fully test.
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ PipelineFragmentContext::PipelineFragmentContext(
_report_thread_active(false),
_report_status_cb(report_status_cb),
_is_report_on_cancel(true) {
if (_query_ctx->get_task_group()) {
_task_group_entity = _query_ctx->get_task_group()->task_entity();
}
_report_thread_future = _report_thread_promise.get_future();
_fragment_watcher.start();
}
Expand Down Expand Up @@ -689,7 +692,7 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
auto* scheduler = _exec_env->pipeline_task_scheduler();
if (get_task_group()) {
if (_task_group_entity) {
scheduler = _exec_env->pipeline_task_group_scheduler();
}
for (auto& task : _tasks) {
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ class RuntimeFilterMergeControllerEntity;
class TDataSink;
class TPipelineFragmentParams;

namespace taskgroup {
class TaskGroup;
} // namespace taskgroup

namespace pipeline {

class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFragmentContext> {
Expand Down Expand Up @@ -121,7 +117,9 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
return _exec_status;
}

taskgroup::TaskGroup* get_task_group() const { return _query_ctx->get_task_group(); }
taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
return _task_group_entity;
}

private:
Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state);
Expand Down Expand Up @@ -176,6 +174,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

std::shared_ptr<QueryContext> _query_ctx;

taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr;

std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;

MonotonicStopWatch _fragment_watcher;
Expand Down
10 changes: 4 additions & 6 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@

namespace doris {
class RuntimeState;
namespace taskgroup {
class TaskGroup;
} // namespace taskgroup
} // namespace doris

namespace doris::pipeline {
Expand Down Expand Up @@ -402,7 +399,8 @@ std::string PipelineTask::debug_string() {
_task_profile->pretty_print(&profile_ss, "");
fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
}
fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index,
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, state = {}]\noperators: ", (void*)this,
get_state_name(_cur_state));
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
Expand All @@ -422,8 +420,8 @@ std::string PipelineTask::debug_string() {
return fmt::to_string(debug_string_buffer);
}

taskgroup::TaskGroup* PipelineTask::get_task_group() const {
return _fragment_context->get_task_group();
taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const {
return _fragment_context->get_task_group_entity();
}

} // namespace doris::pipeline
9 changes: 4 additions & 5 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"
#include "exec/operator.h"
#include "pipeline.h"
#include "runtime/task_group/task_group.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
Expand All @@ -36,9 +37,6 @@ class RuntimeState;
namespace pipeline {
class PipelineFragmentContext;
} // namespace pipeline
namespace taskgroup {
class TaskGroup;
} // namespace taskgroup
} // namespace doris

namespace doris::pipeline {
Expand Down Expand Up @@ -107,6 +105,7 @@ inline const char* get_state_name(PipelineTaskState idx) {
}

class TaskQueue;
class PriorityTaskQueue;

// The class do the pipeline task. Minest schdule union by task scheduler
class PipelineTask {
Expand Down Expand Up @@ -185,11 +184,11 @@ class PipelineTask {

std::string debug_string();

taskgroup::TaskGroup* get_task_group() const;
taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;

void set_task_queue(TaskQueue* task_queue);

static constexpr auto THREAD_TIME_SLICE = 100'000'000L;
static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;

// 1 used for update priority queue
// note(wb) an ugly implementation, need refactor later
Expand Down
59 changes: 35 additions & 24 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include "common/logging.h"
#include "pipeline/pipeline_task.h"
#include "runtime/task_group/task_group.h"

namespace doris {
namespace pipeline {
Expand Down Expand Up @@ -55,7 +54,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}

PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) {
PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand Down Expand Up @@ -94,12 +93,12 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return try_take_unprotected(is_steal);
return _try_take_unprotected(is_steal);
}

PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = try_take_unprotected(false);
auto task = _try_take_unprotected(false);
if (task) {
return task;
} else {
Expand All @@ -108,7 +107,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
} else {
_wait_task.wait(lock);
}
return try_take_unprotected(false);
return _try_take_unprotected(false);
}
}

Expand All @@ -131,6 +130,11 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
return Status::OK();
}

int PriorityTaskQueue::task_size() {
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _total_task_size;
}

MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;

MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) {
Expand Down Expand Up @@ -201,9 +205,9 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) {
}

bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& rhs_ptr) const {
int64_t lhs_val = lhs_ptr->vruntime_ns();
int64_t rhs_val = rhs_ptr->vruntime_ns();
const taskgroup::TGPTEntityPtr& lhs_ptr, const taskgroup::TGPTEntityPtr& rhs_ptr) const {
auto lhs_val = lhs_ptr->vruntime_ns();
auto rhs_val = rhs_ptr->vruntime_ns();
if (lhs_val != rhs_val) {
return lhs_val < rhs_val;
} else {
Expand All @@ -217,7 +221,8 @@ bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
}
}

TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : TaskQueue(core_size) {}
TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
: TaskQueue(core_size), _min_tg_entity(nullptr) {}

TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;

Expand All @@ -237,9 +242,10 @@ Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) {

template <bool from_executor>
Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
auto* entity = task->get_task_group()->task_entity();
task->put_in_runnable_queue();
auto* entity = task->get_task_group_entity();
std::unique_lock<std::mutex> lock(_rs_mutex);
entity->push_back(task);
entity->task_queue()->emplace(task);
if (_group_entities.find(entity) == _group_entities.end()) {
_enqueue_task_group<from_executor>(entity);
}
Expand All @@ -250,7 +256,7 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
// TODO pipeline support steal
PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
std::unique_lock<std::mutex> lock(_rs_mutex);
taskgroup::TGEntityPtr entity = nullptr;
taskgroup::TGPTEntityPtr entity = nullptr;
while (entity == nullptr) {
if (_closed) {
return nullptr;
Expand All @@ -268,11 +274,16 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
if (entity->task_size() == 1) {
_dequeue_task_group(entity);
}
return entity->take();
auto task = entity->task_queue()->front();
if (task) {
entity->task_queue()->pop();
task->pop_out_runnable_queue();
}
return task;
}

template <bool from_worker>
void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr tg_entity) {
_total_cpu_share += tg_entity->cpu_share();
if constexpr (!from_worker) {
/**
Expand All @@ -283,7 +294,9 @@ void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
auto old_v_ns = tg_entity->vruntime_ns();
auto* min_entity = _min_tg_entity.load();
if (min_entity) {
int64_t new_vruntime_ns = min_entity->vruntime_ns() - _ideal_runtime_ns(tg_entity) / 2;
auto min_tg_v = min_entity->vruntime_ns();
auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v;
if (new_vruntime_ns > old_v_ns) {
tg_entity->adjust_vruntime_ns(new_vruntime_ns);
}
Expand All @@ -297,7 +310,7 @@ void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
_update_min_tg();
}

void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr tg_entity) {
void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr tg_entity) {
_total_cpu_share -= tg_entity->cpu_share();
_group_entities.erase(tg_entity);
VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string()
Expand All @@ -317,12 +330,12 @@ void TaskGroupTaskQueue::_update_min_tg() {
}

// like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value.
int64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const {
uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const {
return PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / _total_cpu_share;
}

taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
taskgroup::TGEntityPtr res = nullptr;
taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
taskgroup::TGPTEntityPtr res = nullptr;
for (auto* entity : _group_entities) {
res = entity;
break;
Expand All @@ -332,8 +345,7 @@ taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() {

void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
std::unique_lock<std::mutex> lock(_rs_mutex);
auto* group = task->get_task_group();
auto* entity = group->task_entity();
auto* entity = task->get_task_group_entity();
auto find_entity = _group_entities.find(entity);
bool is_in_queue = find_entity != _group_entities.end();
VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in queue:" << is_in_queue;
Expand All @@ -348,15 +360,14 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spen
}

void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TaskGroupPtr task_group) {
taskgroup::TGPTEntityPtr entity) {
std::unique_lock<std::mutex> lock(_rs_mutex);
auto* entity = task_group->task_entity();
bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
if (is_in_queue) {
_group_entities.erase(entity);
_total_cpu_share -= entity->cpu_share();
}
task_group->update_cpu_share_unlock(task_group_info);
entity->check_and_update_cpu_share(task_group_info);
if (is_in_queue) {
_group_entities.emplace(entity);
_total_cpu_share += entity->cpu_share();
Expand Down
Loading

0 comments on commit 6f125b6

Please sign in to comment.