Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exec](pipeline) runtime filter wait time #34872

Merged
merged 3 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ class IRuntimeFilter {

bool has_remote_target() const { return _has_remote_target; }

bool has_local_target() const { return _has_local_target; }

bool is_ready() const {
return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) ||
(_enable_pipeline_exec &&
Expand Down
39 changes: 32 additions & 7 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ void RuntimeFilterTimer::call_ready() {
_parent->set_ready();
}

// should check rf timeout in two case:
// 1. the rf is ready just remove the wait queue
// 2. if the rf have local dependency, the rf should start wait when all local dependency is ready
bool RuntimeFilterTimer::should_be_check_timeout() {
if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) {
bool all_ready = true;
for (auto& dep : _local_runtime_filter_dependencies) {
if (!dep->ready()) {
all_ready = false;
break;
}
}
if (all_ready) {
_local_runtime_filter_dependencies.clear();
_registration_time = MonotonicMillis();
}
return all_ready;
}
return true;
}

void RuntimeFilterTimerQueue::start() {
while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);
Expand All @@ -135,14 +156,18 @@ void RuntimeFilterTimerQueue::start() {
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
} else if (it->should_be_check_timeout()) {
if (it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
}
} else {
new_que.push_back(std::move(it));
}
}
new_que.swap(_que);
Expand Down
17 changes: 12 additions & 5 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
BasicSharedState* shared_state() { return _shared_state; }
void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; }
virtual std::string debug_string(int indentation_level = 0);
bool ready() const { return _ready; }

// Start the watcher. We use it to count how long this dependency block the current pipeline task.
void start_watcher() { _watcher.start(); }
Expand Down Expand Up @@ -231,11 +232,19 @@ class RuntimeFilterTimer {
int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }

void set_local_runtime_filter_dependencies(
const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) {
_local_runtime_filter_dependencies = deps;
}

bool should_be_check_timeout();

private:
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _local_runtime_filter_dependencies;
std::mutex _lock;
const int64_t _registration_time;
int64_t _registration_time;
const int32_t _wait_time_ms;
};

Expand All @@ -258,11 +267,9 @@ struct RuntimeFilterTimerQueue {

~RuntimeFilterTimerQueue() = default;
RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { push(filter); }

void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) {
std::unique_lock<std::mutex> lc(_que_lock);
_que.push_back(filter);
_que.insert(_que.end(), filter.begin(), filter.end());
cv.notify_all();
}

Expand Down
25 changes: 22 additions & 3 deletions be/src/vec/exec/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
runtime_filter_dependencies,
const int id, const int node_id, const std::string& name) {
runtime_filter_dependencies.resize(_runtime_filter_descs.size());
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> runtime_filter_timers(
_runtime_filter_descs.size());
std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>
local_runtime_filter_dependencies;

for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter);
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
runtime_filter->set_filter_timer(filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
runtime_filter->set_filter_timer(runtime_filter_timers[i]);
if (runtime_filter->has_local_target()) {
local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]);
}
}

// The gloabl runtime filter timer need set local runtime filter dependencies.
// start to wait before the local runtime filter ready
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
if (!runtime_filter->has_local_target()) {
runtime_filter_timers[i]->set_local_runtime_filter_dependencies(
local_runtime_filter_dependencies);
}
}
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(
std::move(runtime_filter_timers));
}

Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
Expand Down
Loading