Skip to content

Commit

Permalink
[Enhancement] enable event scheduler by default (StarRocks#54827)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain authored Jan 23, 2025
1 parent 9870b05 commit e5375da
Show file tree
Hide file tree
Showing 27 changed files with 108 additions and 35 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/connector_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace starrocks {
class ConnectorScanner;

namespace pipeline {
class ConnectorScanOperatorMemShareArbitrator;
struct ConnectorScanOperatorMemShareArbitrator;
}

class ConnectorScanNode final : public starrocks::ScanNode {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuil
exchange_source_op->set_degree_of_parallelism(context->degree_of_parallelism());
operators.emplace_back(exchange_source_op);
} else {
if (_is_parallel_merge || _sort_exec_exprs.is_constant_lhs_ordering()) {
if ((_is_parallel_merge || _sort_exec_exprs.is_constant_lhs_ordering()) &&
!_sort_exec_exprs.lhs_ordering_expr_ctxs().empty()) {
auto exchange_merge_sort_source_operator = std::make_shared<ExchangeParallelMergeSourceOperatorFactory>(
context->next_operator_id(), id(), _num_senders, _input_row_desc, &_sort_exec_exprs, _is_asc_order,
_nulls_first, _offset, _limit);
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ class HashJoiner final : public pipeline::ContextWithDependency {
_probe_observable.add_observer(state, observer);
}

// build status changed. notify probe
auto defer_notify_probe() {
return DeferOp([this]() { _builder_observable.notify_source_observers(); });
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,12 @@ int64_t ExchangeSinkOperator::construct_brpc_attachment(const PTransmitChunkPara
return attachment_physical_bytes;
}

std::string ExchangeSinkOperator::get_name() const {
std::string finished = is_finished() ? "X" : "O";
return fmt::format("{}_{}_{}({}) {{ pending_finish:{} }}", _name, _plan_node_id, (void*)this, finished,
pending_finish());
}

ExchangeSinkOperatorFactory::ExchangeSinkOperatorFactory(
int32_t id, int32_t plan_node_id, std::shared_ptr<SinkBuffer> buffer, TPartitionType::type part_type,
const std::vector<TPlanFragmentDestination>& destinations, bool is_pipeline_level_shuffle,
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class ExchangeSinkOperator final : public Operator {
// Return the physical bytes of attachment.
int64_t construct_brpc_attachment(const PTransmitChunkParamsPtr& _chunk_request, butil::IOBuf& attachment);

std::string get_name() const override;

private:
bool _is_large_chunk(size_t sz) const {
// ref olap_scan_node.cpp release_large_columns
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ void FragmentContext::destroy_pass_through_chunk_buffer() {
Status FragmentContext::set_pipeline_timer(PipelineTimer* timer) {
_pipeline_timer = timer;
_timeout_task = new CheckFragmentTimeout(this);
timespec tm = butil::microseconds_to_timespec(butil::gettimeofday_us());
tm.tv_sec += runtime_state()->query_ctx()->get_query_expire_seconds();
timespec tm = butil::seconds_from_now(runtime_state()->query_ctx()->get_query_expire_seconds());
RETURN_IF_ERROR(_pipeline_timer->schedule(_timeout_task, tm));
return Status::OK();
}
Expand Down Expand Up @@ -470,6 +469,7 @@ Status FragmentContext::submit_all_timer() {
for (auto [delta_ns, task] : _rf_timeout_tasks) {
timespec abstime = tm;
abstime.tv_nsec += delta_ns;
butil::timespec_normalize(&abstime);
RETURN_IF_ERROR(_pipeline_timer->schedule(task, abstime));
}
return Status::OK();
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ class FragmentContextManager {

void cancel(const Status& status);

template <class Caller>
void for_each_fragment(Caller&& caller) {
std::lock_guard guard(_lock);
for (auto& [_, fragment] : _fragment_contexts) {
caller(fragment);
}
}

private:
std::mutex _lock;
std::unordered_map<TUniqueId, FragmentContextPtr> _fragment_contexts;
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const Unifi
TRACE_SCHEDULE_LOG << src->get_name() << " " << src->support_event_scheduler();
TRACE_SCHEDULE_LOG << sink->get_name() << " " << sink->support_event_scheduler();
});

// TODO: using observer implement wait dependencs event
all_support_event_scheduler = all_support_event_scheduler && !runtime_state->enable_wait_dependent_event();
if (all_support_event_scheduler) {
_fragment_ctx->init_event_scheduler();
RETURN_IF_ERROR(_fragment_ctx->set_pipeline_timer(exec_env->pipeline_timer()));
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/nljoin/nljoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ Status NLJoinProbeOperator::reset_state(starrocks::RuntimeState* state, const st
}

bool NLJoinProbeOperator::has_output() const {
_check_post_probe();
return _join_stage != JoinStage::Finished;
}

Expand Down Expand Up @@ -603,6 +602,7 @@ Status NLJoinProbeOperator::_permute_right_join(size_t chunk_size) {
// 2. Apply the conjuncts, and append it to output buffer
// 3. Maintain match index and implement left join and right join
StatusOr<ChunkPtr> NLJoinProbeOperator::pull_chunk(RuntimeState* state) {
_check_post_probe();
size_t chunk_size = state->chunk_size();

if (_join_op == TJoinOp::INNER_JOIN) {
Expand Down
13 changes: 8 additions & 5 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ Status PipelineDriver::prepare(RuntimeState* runtime_state) {
_global_rf_wait_timeout_ns = std::max(_global_rf_wait_timeout_ns, op->global_rf_wait_timeout_ns());
}
}
if (!_global_rf_descriptors.empty() && runtime_state->enable_event_scheduler()) {
_fragment_ctx->add_timer_observer(observer(), _global_rf_wait_timeout_ns);
}

if (!all_local_rf_set.empty()) {
_runtime_profile->add_info_string("LocalRfWaitingSet", strings::Substitute("$0", all_local_rf_set.size()));
Expand Down Expand Up @@ -575,6 +578,7 @@ void PipelineDriver::cancel_operators(RuntimeState* runtime_state) {
WARN_IF_ERROR(_mark_operator_cancelled(op, runtime_state),
fmt::format("cancel pipeline driver error [driver={}]", to_readable_string()));
}
_is_operator_cancelled = true;
}

void PipelineDriver::_close_operators(RuntimeState* runtime_state) {
Expand Down Expand Up @@ -735,11 +739,10 @@ void PipelineDriver::_update_global_rf_timer() {
if (!_runtime_state->enable_event_scheduler()) {
return;
}
auto timer = std::make_unique<RFScanWaitTimeout>(_fragment_ctx);
auto timer = std::make_unique<RFScanWaitTimeout>(_fragment_ctx, true);
timer->add_observer(_runtime_state, &_observer);
_global_rf_timer = std::move(timer);
timespec abstime = butil::microseconds_to_timespec(butil::gettimeofday_us());
abstime.tv_nsec += _global_rf_wait_timeout_ns;
timespec abstime = butil::nanoseconds_from_now(_global_rf_wait_timeout_ns);
WARN_IF_ERROR(_fragment_ctx->pipeline_timer()->schedule(_global_rf_timer.get(), abstime), "schedule:");
}

Expand All @@ -752,8 +755,8 @@ std::string PipelineDriver::to_readable_string() const {
ss << "query_id=" << (this->_query_ctx == nullptr ? "None" : print_id(this->query_ctx()->query_id()))
<< " fragment_id="
<< (this->_fragment_ctx == nullptr ? "None" : print_id(this->fragment_ctx()->fragment_instance_id()))
<< " driver=" << _driver_name << ", status=" << ds_to_string(this->driver_state()) << block_reasons
<< ", operator-chain: [";
<< " driver=" << _driver_name << " addr=" << this << ", status=" << ds_to_string(this->driver_state())
<< block_reasons << ", operator-chain: [";
for (size_t i = 0; i < _operators.size(); ++i) {
if (i == 0) {
ss << _operators[i]->get_name();
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class PipelineDriver {
_output_full_timer_sw->reset();
break;
case DriverState::PRECONDITION_BLOCK:
DCHECK_EQ(_state, DriverState::READY);
_precondition_block_timer_sw->reset();
break;
case DriverState::PENDING_FINISH:
Expand Down Expand Up @@ -393,6 +394,8 @@ class PipelineDriver {
}
}

void set_all_global_rf_timeout() { _all_global_rf_ready_or_timeout = true; }

bool has_precondition() const {
return !_local_rf_holders.empty() || !_dependencies.empty() || !_global_rf_descriptors.empty();
}
Expand Down Expand Up @@ -505,6 +508,7 @@ class PipelineDriver {

PipelineObserver* observer() { return &_observer; }
void assign_observer();
bool is_operator_cancelled() const { return _is_operator_cancelled; }

protected:
PipelineDriver()
Expand Down Expand Up @@ -590,6 +594,8 @@ class PipelineDriver {

std::atomic<bool> _has_log_cancelled{false};

std::atomic<bool> _is_operator_cancelled{false};

PipelineObserver _observer;

std::unique_ptr<PipelineTimerTask> _global_rf_timer;
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>

#include "exec/pipeline/pipeline_fwd.h"
#include "runtime/exec_env.h"
#include "util/time_guard.h"

namespace starrocks::pipeline {
Expand Down Expand Up @@ -263,6 +264,17 @@ void PipelineDriverPoller::on_cancel(DriverRawPtr driver, std::vector<DriverRawP
}

void PipelineDriverPoller::for_each_driver(const ConstDriverConsumer& call) const {
auto* env = ExecEnv::GetInstance();
env->query_context_mgr()->for_each_active_ctx([&call](const QueryContextPtr& ctx) {
ctx->fragment_mgr()->for_each_fragment([&call](const FragmentContextPtr& fragment) {
fragment->iterate_drivers([&call](const std::shared_ptr<PipelineDriver>& driver) {
if (driver->is_in_blocked()) {
call(driver.get());
}
});
});
});

std::shared_lock guard(_local_mutex);
for (auto* driver : _local_blocked_drivers) {
call(driver);
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,4 +697,16 @@ void QueryContextManager::report_fragments(
}
}

void QueryContextManager::for_each_active_ctx(const std::function<void(QueryContextPtr)>& func) {
for (auto i = 0; i < _num_slots; ++i) {
auto& mutex = _mutexes[i];
std::vector<QueryContextPtr> del_list;
std::unique_lock write_lock(mutex);
auto& contexts = _context_maps[i];
for (auto& [_, context] : contexts) {
func(context);
}
}
}

} // namespace starrocks::pipeline
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ using std::chrono::milliseconds;
using std::chrono::steady_clock;
using std::chrono::duration_cast;

class ConnectorScanOperatorMemShareArbitrator;
struct ConnectorScanOperatorMemShareArbitrator;

// The context for all fragment of one query in one BE
class QueryContext : public std::enable_shared_from_this<QueryContext> {
Expand Down Expand Up @@ -393,6 +393,7 @@ class QueryContextManager {

void collect_query_statistics(const PCollectQueryStatisticsRequest* request,
PCollectQueryStatisticsResult* response);
void for_each_active_ctx(const std::function<void(QueryContextPtr)>& func);

private:
static void _clean_func(QueryContextManager* manager);
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/pipeline/scan/chunk_buffer_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "exec/pipeline/scan/chunk_buffer_limiter.h"

#include <atomic>

#include "glog/logging.h"

namespace starrocks::pipeline {
Expand Down Expand Up @@ -48,8 +50,9 @@ ChunkBufferTokenPtr DynamicChunkBufferLimiter::pin(int num_chunks) {

void DynamicChunkBufferLimiter::unpin(int num_chunks) {
int prev_value = _pinned_chunks_counter.fetch_sub(num_chunks);
if (prev_value >= _capacity && !is_full()) {
if ((prev_value >= _capacity || _returned_full_event.load(std::memory_order_acquire)) && !is_full()) {
_has_full_event = true;
_returned_full_event = false;
}
DCHECK_GE(prev_value, 1);
}
Expand Down
10 changes: 9 additions & 1 deletion be/src/exec/pipeline/scan/chunk_buffer_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,13 @@ class DynamicChunkBufferLimiter final : public ChunkBufferLimiter {
ChunkBufferTokenPtr pin(int num_chunks) override;
void unpin(int num_chunks);

bool is_full() const override { return _pinned_chunks_counter >= _capacity; }
bool is_full() const override {
if (_pinned_chunks_counter >= _capacity) {
_returned_full_event.store(true, std::memory_order_release);
return true;
}
return false;
}
size_t size() const override { return _pinned_chunks_counter; }
size_t capacity() const override { return _capacity; }
size_t default_capacity() const override { return _default_capacity; }
Expand Down Expand Up @@ -154,6 +160,8 @@ class DynamicChunkBufferLimiter final : public ChunkBufferLimiter {
std::atomic<int> _pinned_chunks_counter = 0;

std::atomic<bool> _has_full_event{};

mutable std::atomic<bool> _returned_full_event{};
};

} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ConnectorScanOperatorFactory : public ScanOperatorFactory {

~ConnectorScanOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

Status do_prepare(RuntimeState* state) override;
void do_close(RuntimeState* state) override;
OperatorPtr do_create(int32_t dop, int32_t driver_sequence) override;
Expand Down
17 changes: 3 additions & 14 deletions be/src/exec/pipeline/schedule/event_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ void EventScheduler::try_schedule(const DriverRawPtr driver) {

// The logic in the pipeline poller is basically the same.
auto fragment_ctx = driver->fragment_ctx();
if (fragment_ctx->is_canceled()) {
add_to_ready_queue = on_cancel(driver);
if (fragment_ctx->is_canceled() && !driver->is_operator_cancelled()) {
add_to_ready_queue = true;
} else if (driver->need_report_exec_state()) {
add_to_ready_queue = true;
} else if (driver->pending_finish()) {
Expand All @@ -59,7 +59,7 @@ void EventScheduler::try_schedule(const DriverRawPtr driver) {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (!status_or_is_not_blocked.ok()) {
fragment_ctx->cancel(status_or_is_not_blocked.status());
add_to_ready_queue = on_cancel(driver);
add_to_ready_queue = true;
} else if (status_or_is_not_blocked.value()) {
driver->set_driver_state(DriverState::READY);
add_to_ready_queue = true;
Expand All @@ -73,15 +73,4 @@ void EventScheduler::try_schedule(const DriverRawPtr driver) {
_driver_queue->put_back(driver);
}
}

bool EventScheduler::on_cancel(DriverRawPtr driver) {
driver->cancel_operators(driver->fragment_ctx()->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
return false;
} else {
driver->set_driver_state(DriverState::CANCELED);
return true;
}
}
} // namespace starrocks::pipeline
2 changes: 0 additions & 2 deletions be/src/exec/pipeline/schedule/event_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class EventScheduler {

void try_schedule(const DriverRawPtr driver);

bool on_cancel(DriverRawPtr driver);

void attach_queue(DriverQueue* queue) {
if (_driver_queue == nullptr) {
_driver_queue = queue;
Expand Down
11 changes: 9 additions & 2 deletions be/src/exec/pipeline/schedule/observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ void PipelineObserver::_do_update(int event) {
auto sink = driver->sink_operator();
auto source = driver->source_operator();

if (auto state = driver->driver_state(); state == DriverState::INPUT_EMPTY || state == DriverState::OUTPUT_FULL) {
TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state()
if (!driver->is_finished() && !driver->pending_finish()) {
TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state() << " event:" << event
<< " in_block_queue:" << driver->is_in_blocked()
<< " source finished:" << source->is_finished()
<< " operator has output:" << source->has_output()
Expand Down Expand Up @@ -84,4 +84,11 @@ std::string Observable::to_string() const {
return str;
}

void Observable::notify_runtime_filter_timeout() {
for (auto* observer : _observers) {
observer->driver()->set_all_global_rf_timeout();
observer->source_trigger();
}
}

} // namespace starrocks::pipeline
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/schedule/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class Observable {
// Non-thread-safe, we only allow the need to do this in the fragment->prepare phase.
void add_observer(RuntimeState* state, PipelineObserver* observer) {
if (state->enable_event_scheduler()) {
DCHECK(observer != nullptr);
_observers.push_back(observer);
}
}
Expand All @@ -120,6 +121,8 @@ class Observable {
}
}

void notify_runtime_filter_timeout();

size_t num_observers() const { return _observers.size(); }

std::string to_string() const;
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/pipeline/schedule/timeout_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ void CheckFragmentTimeout::Run() {
}

void RFScanWaitTimeout::Run() {
_timeout.notify_source_observers();
if (_all_rf_timeout) {
_timeout.notify_runtime_filter_timeout();
} else {
_timeout.notify_source_observers();
}
}

} // namespace starrocks::pipeline
Loading

0 comments on commit e5375da

Please sign in to comment.