diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 7a1194d8a8fd0..b6d5e48bfe076 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -229,6 +229,7 @@ set(EXEC_FILES pipeline/aggregate/repeat/repeat_operator.cpp pipeline/analysis/analytic_sink_operator.cpp pipeline/analysis/analytic_source_operator.cpp + pipeline/bucket_process_operator.cpp pipeline/table_function_operator.cpp pipeline/assert_num_rows_operator.cpp pipeline/set/union_passthrough_operator.cpp diff --git a/be/src/exec/aggregate/aggregate_blocking_node.cpp b/be/src/exec/aggregate/aggregate_blocking_node.cpp index 97bf2cd31dfed..43f7732635cbe 100644 --- a/be/src/exec/aggregate/aggregate_blocking_node.cpp +++ b/be/src/exec/aggregate/aggregate_blocking_node.cpp @@ -14,6 +14,7 @@ #include "exec/aggregate/aggregate_blocking_node.h" +#include #include #include @@ -26,14 +27,16 @@ #include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h" #include "exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h" #include "exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h" +#include "exec/pipeline/bucket_process_operator.h" #include "exec/pipeline/chunk_accumulate_operator.h" -#include "exec/pipeline/exchange/exchange_source_operator.h" +#include "exec/pipeline/exchange/local_exchange_source_operator.h" #include "exec/pipeline/limit_operator.h" #include "exec/pipeline/noop_sink_operator.h" #include "exec/pipeline/operator.h" #include "exec/pipeline/pipeline_builder.h" #include "exec/pipeline/spill_process_operator.h" #include "exec/sorted_streaming_aggregator.h" +#include "gutil/casts.h" #include "runtime/current_thread.h" #include "simd/simd.h" @@ -172,7 +175,8 @@ Status AggregateBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, boo template pipeline::OpFactories AggregateBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink, - pipeline::PipelineBuilderContext* context) { + pipeline::PipelineBuilderContext* context, + bool per_bucket_optimize) { using namespace pipeline; auto workgroup = context->fragment_context()->workgroup(); @@ -206,11 +210,24 @@ pipeline::OpFactories AggregateBlockingNode::_decompose_to_pipeline(pipeline::Op // Initialize OperatorFactory's fields involving runtime filters. auto&& rc_rf_probe_collector = std::make_shared(2, std::move(this->runtime_filter_collector())); this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector); + auto bucket_process_context_factory = std::make_shared(); + if (per_bucket_optimize) { + agg_sink_op = std::make_shared( + context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op)); + } + ops_with_sink.push_back(std::move(agg_sink_op)); OpFactories ops_with_source; // Initialize OperatorFactory's fields involving runtime filters. this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector); + + if (per_bucket_optimize) { + auto bucket_source_operator = std::make_shared( + context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op)); + context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op); + agg_source_op = std::move(bucket_source_operator); + } ops_with_source.push_back(std::move(agg_source_op)); if (should_cache) { @@ -230,6 +247,8 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip auto& agg_node = _tnode.agg_node; bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg; + bool use_per_bucket_optimize = + _tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize; bool has_group_by_keys = agg_node.__isset.grouping_exprs && !_tnode.agg_node.grouping_exprs.empty(); bool could_local_shuffle = context->could_local_shuffle(ops_with_sink); @@ -264,19 +283,22 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip } } + use_per_bucket_optimize &= dynamic_cast(ops_with_sink.back().get()) == nullptr; + OpFactories ops_with_source; if (sorted_streaming_aggregate) { ops_with_source = _decompose_to_pipeline(ops_with_sink, context); + SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false); } else { if (runtime_state()->enable_spill() && runtime_state()->enable_agg_spill() && has_group_by_keys) { - ops_with_source = - _decompose_to_pipeline(ops_with_sink, context); + ops_with_source = _decompose_to_pipeline( + ops_with_sink, context, use_per_bucket_optimize && has_group_by_keys); } else { ops_with_source = _decompose_to_pipeline(ops_with_sink, context); + AggregateBlockingSinkOperatorFactory>( + ops_with_sink, context, use_per_bucket_optimize && has_group_by_keys); } } diff --git a/be/src/exec/aggregate/aggregate_blocking_node.h b/be/src/exec/aggregate/aggregate_blocking_node.h index 3c5f851b6627f..1fb155d7b9873 100644 --- a/be/src/exec/aggregate/aggregate_blocking_node.h +++ b/be/src/exec/aggregate/aggregate_blocking_node.h @@ -35,6 +35,6 @@ class AggregateBlockingNode final : public AggregateBaseNode { private: template pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink, - pipeline::PipelineBuilderContext* context); + pipeline::PipelineBuilderContext* context, bool per_bucket_optimize); }; } // namespace starrocks diff --git a/be/src/exec/aggregate/distinct_blocking_node.cpp b/be/src/exec/aggregate/distinct_blocking_node.cpp index 4514d16e5ce31..e68263fcbd22c 100644 --- a/be/src/exec/aggregate/distinct_blocking_node.cpp +++ b/be/src/exec/aggregate/distinct_blocking_node.cpp @@ -24,7 +24,9 @@ #include "exec/pipeline/aggregate/sorted_aggregate_streaming_sink_operator.h" #include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h" #include "exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h" +#include "exec/pipeline/bucket_process_operator.h" #include "exec/pipeline/chunk_accumulate_operator.h" +#include "exec/pipeline/exchange/local_exchange_source_operator.h" #include "exec/pipeline/limit_operator.h" #include "exec/pipeline/operator.h" #include "exec/pipeline/pipeline_builder.h" @@ -128,7 +130,8 @@ Status DistinctBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool template pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink, - pipeline::PipelineBuilderContext* context) { + pipeline::PipelineBuilderContext* context, + bool per_bucket_optimize) { using namespace pipeline; auto workgroup = context->fragment_context()->workgroup(); @@ -157,11 +160,24 @@ pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpF auto [agg_sink_op, agg_source_op] = operators_generator(false); + auto bucket_process_context_factory = std::make_shared(); + if (per_bucket_optimize) { + agg_sink_op = std::make_shared( + context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op)); + } + // Create a shared RefCountedRuntimeFilterCollector auto&& rc_rf_probe_collector = std::make_shared(2, std::move(this->runtime_filter_collector())); // Initialize OperatorFactory's fields involving runtime filters. this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector); + if (per_bucket_optimize) { + auto bucket_source_operator = std::make_shared( + context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op)); + context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op); + agg_source_op = std::move(bucket_source_operator); + } + OpFactories ops_with_source; // Initialize OperatorFactory's fields involving runtime filters. this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector); @@ -186,6 +202,8 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe OpFactories ops_with_sink = _children[0]->decompose_to_pipeline(context); bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg; + bool use_per_bucket_optimize = + _tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize; bool could_local_shuffle = context->could_local_shuffle(ops_with_sink); auto try_interpolate_local_shuffle = [this, context](auto& ops) { @@ -200,23 +218,24 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe if (!sorted_streaming_aggregate) { ops_with_sink = try_interpolate_local_shuffle(ops_with_sink); } + use_per_bucket_optimize &= dynamic_cast(ops_with_sink.back().get()) == nullptr; OpFactories ops_with_source; if (sorted_streaming_aggregate) { ops_with_source = _decompose_to_pipeline(ops_with_sink, context); + SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false); } else { if (runtime_state()->enable_spill() && runtime_state()->enable_agg_distinct_spill()) { ops_with_source = _decompose_to_pipeline(ops_with_sink, - context); + SpillableAggregateDistinctBlockingSinkOperatorFactory>( + ops_with_sink, context, use_per_bucket_optimize); } else { - ops_with_source = - _decompose_to_pipeline(ops_with_sink, context); + ops_with_source = _decompose_to_pipeline( + ops_with_sink, context, use_per_bucket_optimize); } } diff --git a/be/src/exec/aggregate/distinct_blocking_node.h b/be/src/exec/aggregate/distinct_blocking_node.h index 0f64caa4665de..eaf0b2c5e69a0 100644 --- a/be/src/exec/aggregate/distinct_blocking_node.h +++ b/be/src/exec/aggregate/distinct_blocking_node.h @@ -33,6 +33,6 @@ class DistinctBlockingNode final : public AggregateBaseNode { private: template pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink, - pipeline::PipelineBuilderContext* context); + pipeline::PipelineBuilderContext* context, bool per_bucket_optimize); }; } // namespace starrocks diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 6cec8efc7d53f..b7881cf30dd34 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -30,6 +30,7 @@ #include "exprs/expr_context.h" #include "exprs/runtime_filter_bank.h" #include "glog/logging.h" +#include "gutil/casts.h" #include "runtime/current_thread.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -63,6 +64,10 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _sorted_by_keys_per_tablet = tnode.olap_scan_node.sorted_by_keys_per_tablet; } + if (tnode.olap_scan_node.__isset.output_chunk_by_bucket) { + _output_chunk_by_bucket = tnode.olap_scan_node.output_chunk_by_bucket; + } + if (_olap_scan_node.__isset.bucket_exprs) { const auto& bucket_exprs = _olap_scan_node.bucket_exprs; _bucket_exprs.resize(bucket_exprs.size()); @@ -392,6 +397,13 @@ StatusOr OlapScanNode::convert_scan_range_to_morsel_qu morsels.emplace_back(std::make_unique(node_id, scan_range)); } + if (output_chunk_by_bucket()) { + std::sort(morsels.begin(), morsels.end(), [](auto& l, auto& r) { + return down_cast(l.get())->owner_id() < + down_cast(r.get())->owner_id(); + }); + } + // None tablet to read shouldn't use tablet internal parallel. if (morsels.empty()) { return std::make_unique(std::move(morsels)); diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index ac6696cd6ed83..b3828b550c3a3 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -95,6 +95,8 @@ class OlapScanNode final : public starrocks::ScanNode { return starrocks::ScanNode::io_tasks_per_scan_operator(); } + bool output_chunk_by_bucket() const override { return _output_chunk_by_bucket; } + const std::vector& bucket_exprs() const { return _bucket_exprs; } private: @@ -198,6 +200,7 @@ class OlapScanNode final : public starrocks::ScanNode { std::vector> _tablet_rowsets; bool _sorted_by_keys_per_tablet = false; + bool _output_chunk_by_bucket = false; std::vector _bucket_exprs; diff --git a/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h b/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h index 9d77c61214d7b..ce8972a3a6998 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include "exec/aggregator.h" @@ -54,7 +55,7 @@ class AggregateBlockingSinkOperator : public Operator { private: // Whether prev operator has no output - bool _is_finished = false; + std::atomic_bool _is_finished = false; // whether enable aggregate group by limit optimize bool _agg_group_by_with_limit = false; }; diff --git a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.cpp b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.cpp index 233f2ea2fa6e1..d3847c8c8d282 100644 --- a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.cpp +++ b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.cpp @@ -110,6 +110,14 @@ Status SpillableAggregateBlockingSinkOperator::push_chunk(RuntimeState* state, c return Status::OK(); } +Status SpillableAggregateBlockingSinkOperator::reset_state(RuntimeState* state, + const std::vector& refill_chunks) { + _is_finished = false; + RETURN_IF_ERROR(_aggregator->spiller()->reset_state(state)); + RETURN_IF_ERROR(AggregateBlockingSinkOperator::reset_state(state, refill_chunks)); + return Status::OK(); +} + Status SpillableAggregateBlockingSinkOperator::_spill_all_inputs(RuntimeState* state, const ChunkPtr& chunk) { // spill all data DCHECK(!_aggregator->is_none_group_by_exprs()); diff --git a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h index c76bbe47d6dbf..7cd0bc878e2bd 100644 --- a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h @@ -57,6 +57,8 @@ class SpillableAggregateBlockingSinkOperator : public AggregateBlockingSinkOpera return 0; } + Status reset_state(RuntimeState* state, const std::vector& refill_chunks) override; + private: bool spilled() const { return _aggregator->spiller()->spilled(); } diff --git a/be/src/exec/pipeline/bucket_process_operator.cpp b/be/src/exec/pipeline/bucket_process_operator.cpp new file mode 100644 index 0000000000000..9a3f3fbeb95ee --- /dev/null +++ b/be/src/exec/pipeline/bucket_process_operator.cpp @@ -0,0 +1,149 @@ +#include "exec/pipeline/bucket_process_operator.h" + +#include "exec/pipeline/operator.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" + +namespace starrocks::pipeline { + +Status BucketProcessContext::reset_operator_state(RuntimeState* state) { + RETURN_IF_ERROR(source->reset_state(state, {})); + RETURN_IF_ERROR(sink->reset_state(state, {})); + return Status::OK(); +} + +Status BucketProcessSinkOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Operator::prepare(state)); + RETURN_IF_ERROR(_ctx->sink->prepare(state)); + return Status::OK(); +} + +void BucketProcessSinkOperator::close(RuntimeState* state) { + _ctx->sink->close(state); +} + +bool BucketProcessSinkOperator::need_input() const { + if (_ctx->current_bucket_sink_finished) { + return false; + } + return _ctx->sink->need_input(); +} + +bool BucketProcessSinkOperator::is_finished() const { + return _ctx->all_input_finishing && _ctx->sink->is_finished(); +} + +Status BucketProcessSinkOperator::set_finishing(RuntimeState* state) { + _ctx->all_input_finishing = true; + bool token = _ctx->token; + if (!token && _ctx->token.compare_exchange_strong(token, true)) { + RETURN_IF_ERROR(_ctx->sink->set_finishing(state)); + _ctx->current_bucket_sink_finished = true; + } + + return Status::OK(); +} + +Status BucketProcessSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) { + auto info = chunk->owner_info(); + if (!chunk->is_empty()) { + RETURN_IF_ERROR(_ctx->sink->push_chunk(state, chunk)); + } + if (info.is_last_chunk()) { + RETURN_IF_ERROR(_ctx->sink->set_finishing(state)); + _ctx->current_bucket_sink_finished = true; + } + return Status::OK(); +} + +Status BucketProcessSourceOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Operator::prepare(state)); + return _ctx->source->prepare(state); +} + +bool BucketProcessSourceOperator::has_output() const { + return _ctx->current_bucket_sink_finished && (_ctx->source->has_output() || _ctx->source->is_finished()); +} +bool BucketProcessSourceOperator::is_finished() const { + return _ctx->all_input_finishing && _ctx->source->is_finished(); +} +Status BucketProcessSourceOperator::set_finished(RuntimeState* state) { + _ctx->finished = true; + RETURN_IF_ERROR(_ctx->source->set_finished(state)); + return Status::OK(); +} +void BucketProcessSourceOperator::close(RuntimeState* state) { + _ctx->source->close(state); +} + +StatusOr BucketProcessSourceOperator::pull_chunk(RuntimeState* state) { + ChunkPtr chunk; + if (_ctx->source->has_output()) { + ASSIGN_OR_RETURN(chunk, _ctx->source->pull_chunk(state)); + } + + if (!_ctx->all_input_finishing && _ctx->source->is_finished()) { + bool token = _ctx->token; + if (!token && _ctx->token.compare_exchange_strong(token, true)) { + RETURN_IF_ERROR(_ctx->reset_operator_state(state)); + if (_ctx->all_input_finishing) { + RETURN_IF_ERROR(_ctx->sink->set_finishing(state)); + _ctx->current_bucket_sink_finished = true; + } else { + _ctx->current_bucket_sink_finished = false; + } + _ctx->token = false; + } + } + + return chunk; +} + +BucketProcessSinkOperatorFactory::BucketProcessSinkOperatorFactory( + int32_t id, int32_t plan_node_id, const BucketProcessContextFactoryPtr& context_factory, + const OperatorFactoryPtr& factory) + : OperatorFactory(id, "bucket_process_sink_factory", plan_node_id), + _factory(factory), + _ctx_factory(context_factory) {} + +OperatorPtr BucketProcessSinkOperatorFactory::create(int32_t degree_of_parallelism, int32_t driver_sequence) { + auto ctx = _ctx_factory->get_or_create(driver_sequence); + ctx->sink = _factory->create(degree_of_parallelism, driver_sequence); + auto bucket_source_operator = + std::make_shared(this, _id, _plan_node_id, driver_sequence, ctx); + return bucket_source_operator; +} + +Status BucketProcessSinkOperatorFactory::prepare(RuntimeState* state) { + return _factory->prepare(state); +} + +void BucketProcessSinkOperatorFactory::close(RuntimeState* state) { + _factory->close(state); +} + +BucketProcessSourceOperatorFactory::BucketProcessSourceOperatorFactory( + int32_t id, int32_t plan_node_id, const BucketProcessContextFactoryPtr& context_factory, + const OperatorFactoryPtr& factory) + : SourceOperatorFactory(id, "bucket_process_factory", plan_node_id), + _factory(factory), + _ctx_factory(context_factory) {} + +OperatorPtr BucketProcessSourceOperatorFactory::create(int32_t degree_of_parallelism, int32_t driver_sequence) { + auto ctx = _ctx_factory->get_or_create(driver_sequence); + ctx->source = _factory->create(degree_of_parallelism, driver_sequence); + auto bucket_source_operator = + std::make_shared(this, _id, _plan_node_id, driver_sequence, ctx); + return bucket_source_operator; +} + +Status BucketProcessSourceOperatorFactory::prepare(RuntimeState* state) { + return _factory->prepare(state); +} + +void BucketProcessSourceOperatorFactory::close(RuntimeState* state) { + _factory->close(state); +} + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/bucket_process_operator.h b/be/src/exec/pipeline/bucket_process_operator.h index e38088ec4ef56..2f74b550262fb 100644 --- a/be/src/exec/pipeline/bucket_process_operator.h +++ b/be/src/exec/pipeline/bucket_process_operator.h @@ -29,7 +29,11 @@ class BucketProcessSinkOperator : public Operator { public: BucketProcessSinkOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence, BucketProcessContextPtr& ctx) +<<<<<<< HEAD : Operator(factory, id, "bucket_process_sink", plan_node_id, false, driver_sequence), _ctx(ctx) {} +======= + : Operator(factory, id, "bucket_process_sink", plan_node_id, driver_sequence), _ctx(ctx) {} +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) ~BucketProcessSinkOperator() override = default; Status prepare(RuntimeState* state) override; @@ -60,7 +64,11 @@ class BucketProcessSourceOperator : public SourceOperator { public: BucketProcessSourceOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence, BucketProcessContextPtr& ctx) +<<<<<<< HEAD : SourceOperator(factory, id, "bucket_process_source", plan_node_id, false, driver_sequence), _ctx(ctx) {} +======= + : SourceOperator(factory, id, "bucket_process_source", plan_node_id, driver_sequence), _ctx(ctx) {} +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) ~BucketProcessSourceOperator() override = default; Status prepare(RuntimeState* state) override; @@ -119,4 +127,8 @@ class BucketProcessSourceOperatorFactory final : public SourceOperatorFactory { BucketProcessContextFactoryPtr _ctx_factory; }; +<<<<<<< HEAD } // namespace starrocks::pipeline +======= +} // namespace starrocks::pipeline +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) diff --git a/be/src/exec/pipeline/operator.h b/be/src/exec/pipeline/operator.h index 63ab02f383f4a..257289d010b27 100644 --- a/be/src/exec/pipeline/operator.h +++ b/be/src/exec/pipeline/operator.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "column/vectorized_fwd.h" #include "common/statusor.h" #include "exec/pipeline/runtime_filter_types.h" @@ -92,6 +94,9 @@ class Operator { // Whether we could pull chunk from this operator virtual bool has_output() const = 0; + // return true if operator should ignore eos chunk + virtual bool ignore_empty_eos() const { return true; } + // Whether we could push chunk to this operator virtual bool need_input() const = 0; @@ -251,7 +256,11 @@ class Operator { // if return true it means the operator has child operators virtual bool is_combinatorial_operator() const { return false; } +<<<<<<< HEAD // apply operation for each child operator +======= + // +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) virtual void for_each_child_operator(const std::function& apply) {} protected: diff --git a/be/src/exec/pipeline/pipeline.cpp b/be/src/exec/pipeline/pipeline.cpp index 934e651a8203b..9179ebbd8281f 100644 --- a/be/src/exec/pipeline/pipeline.cpp +++ b/be/src/exec/pipeline/pipeline.cpp @@ -14,6 +14,7 @@ #include "exec/pipeline/pipeline.h" +#include "exec/pipeline/operator.h" #include "exec/pipeline/pipeline_driver.h" #include "exec/pipeline/scan/connector_scan_operator.h" #include "exec/pipeline/stream_pipeline_driver.h" @@ -113,6 +114,11 @@ void Pipeline::setup_drivers_profile(const DriverPtr& driver) { for (int32_t i = operators.size() - 1; i >= 0; --i) { auto& curr_op = operators[i]; driver->runtime_profile()->add_child(curr_op->runtime_profile(), true, nullptr); + if (curr_op->is_combinatorial_operator()) { + curr_op->for_each_child_operator([&](Operator* child) { + driver->runtime_profile()->add_child(child->runtime_profile(), true, nullptr); + }); + } } } diff --git a/be/src/exec/pipeline/pipeline_driver.cpp b/be/src/exec/pipeline/pipeline_driver.cpp index d037881afdc93..9af7f1f7bf95f 100644 --- a/be/src/exec/pipeline/pipeline_driver.cpp +++ b/be/src/exec/pipeline/pipeline_driver.cpp @@ -21,12 +21,14 @@ #include "exec/pipeline/exchange/exchange_sink_operator.h" #include "exec/pipeline/pipeline_driver_executor.h" #include "exec/pipeline/scan/olap_scan_operator.h" +#include "exec/pipeline/scan/scan_operator.h" #include "exec/pipeline/source_operator.h" #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" #include "exec/query_cache/multilane_operator.h" #include "exec/query_cache/ticket_checker.h" #include "exec/workgroup/work_group.h" +#include "gutil/casts.h" #include "runtime/current_thread.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -98,19 +100,19 @@ Status PipelineDriver::prepare(RuntimeState* runtime_state) { DCHECK(_state == DriverState::NOT_READY); auto* source_op = source_operator(); + const auto use_cache = _fragment_ctx->enable_cache(); + // attach ticket_checker to both ScanOperator and SplitMorselQueue auto should_attach_ticket_checker = (dynamic_cast(source_op) != nullptr) && - (dynamic_cast(_morsel_queue) != nullptr) && - _fragment_ctx->enable_cache(); + _morsel_queue != nullptr && _morsel_queue->could_attch_ticket_checker() && + (use_cache || down_cast(source_op)->output_chunk_by_bucket()); if (should_attach_ticket_checker) { auto* scan_op = dynamic_cast(source_op); - auto* split_morsel_queue = dynamic_cast(_morsel_queue); auto ticket_checker = std::make_shared(); scan_op->set_ticket_checker(ticket_checker); - split_morsel_queue->set_ticket_checker(ticket_checker); + _morsel_queue->set_ticket_checker(ticket_checker); } - const auto use_cache = _fragment_ctx->enable_cache(); source_op->add_morsel_queue(_morsel_queue); // fill OperatorWithDependency instances into _dependencies from _operators. DCHECK(_dependencies.empty()); @@ -322,7 +324,7 @@ StatusOr PipelineDriver::process(RuntimeState* runtime_state, int w if (return_status.ok()) { if (maybe_chunk.value() && (maybe_chunk.value()->num_rows() > 0 || - (maybe_chunk.value()->owner_info().is_last_chunk() && is_multilane(next_op)))) { + (maybe_chunk.value()->owner_info().is_last_chunk() && !next_op->ignore_empty_eos()))) { size_t row_num = maybe_chunk.value()->num_rows(); if (UNLIKELY(row_num > runtime_state->chunk_size())) { return Status::InternalError( diff --git a/be/src/exec/pipeline/scan/chunk_source.cpp b/be/src/exec/pipeline/scan/chunk_source.cpp index e7d430a9ca0b1..657c38348718a 100644 --- a/be/src/exec/pipeline/scan/chunk_source.cpp +++ b/be/src/exec/pipeline/scan/chunk_source.cpp @@ -54,7 +54,7 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_ } int64_t time_spent_ns = 0; - auto [tablet_id, version] = _morsel->get_lane_owner_and_version(); + auto [owner_id, version] = _morsel->get_lane_owner_and_version(); for (size_t i = 0; i < batch_size && !state->is_cancelled(); ++i) { { SCOPED_RAW_TIMER(&time_spent_ns); @@ -73,17 +73,17 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_ if (!_status.ok()) { // end of file is normal case, need process chunk if (_status.is_end_of_file()) { - chunk->owner_info().set_owner_id(tablet_id, true); + chunk->owner_info().set_owner_id(owner_id, true); _chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token)); } else if (_status.is_time_out()) { - chunk->owner_info().set_owner_id(tablet_id, false); + chunk->owner_info().set_owner_id(owner_id, false); _chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token)); _status = Status::OK(); } break; } - chunk->owner_info().set_owner_id(tablet_id, false); + chunk->owner_info().set_owner_id(owner_id, false); _chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token)); } diff --git a/be/src/exec/pipeline/scan/morsel.cpp b/be/src/exec/pipeline/scan/morsel.cpp index 5b8d441691fb0..87ecb16ec1947 100644 --- a/be/src/exec/pipeline/scan/morsel.cpp +++ b/be/src/exec/pipeline/scan/morsel.cpp @@ -14,6 +14,11 @@ #include "exec/pipeline/scan/morsel.h" +#include + +#include + +#include "common/statusor.h" #include "exec/olap_utils.h" #include "storage/chunk_helper.h" #include "storage/range.h" @@ -71,6 +76,34 @@ IndividualMorselQueueFactory::IndividualMorselQueueFactory(std::map&& queue_per_driver_seq, + bool could_local_shuffle) + : _could_local_shuffle(could_local_shuffle) { + if (queue_per_driver_seq.empty()) { + _queue_per_driver_seq.emplace_back(pipeline::create_empty_morsel_queue()); + return; + } + + _queue_per_driver_seq.reserve(queue_per_driver_seq.size()); + int max_dop = queue_per_driver_seq.rbegin()->first; + for (int i = 0; i <= max_dop; ++i) { + auto it = queue_per_driver_seq.find(i); + if (it == queue_per_driver_seq.end()) { + _queue_per_driver_seq.emplace_back(create_empty_morsel_queue()); + } else { + _queue_per_driver_seq.emplace_back(std::make_unique(std::move(it->second))); + } + } +} + +size_t BucketSequenceMorselQueueFactory::num_original_morsels() const { + size_t total = 0; + for (const auto& queue : _queue_per_driver_seq) { + total += queue->num_original_morsels(); + } + return total; +} + /// MorselQueue. std::vector _convert_morsels_to_olap_scan_ranges(const Morsels& morsels) { std::vector scan_ranges; @@ -111,6 +144,65 @@ StatusOr FixedMorselQueue::try_get() { } } +BucketSequenceMorselQueue::BucketSequenceMorselQueue(MorselQueuePtr&& morsel_queue) + : _morsel_queue(std::move(morsel_queue)) {} + +std::vector BucketSequenceMorselQueue::olap_scan_ranges() const { + return _morsel_queue->olap_scan_ranges(); +} + +bool BucketSequenceMorselQueue::empty() const { + return _unget_morsel == nullptr && _morsel_queue->empty(); +} + +StatusOr BucketSequenceMorselQueue::try_get() { + if (_unget_morsel != nullptr) { + return std::move(_unget_morsel); + } + if (_morsel_queue->empty()) { + return nullptr; + } + ASSIGN_OR_RETURN(auto morsel, _morsel_queue->try_get()); + auto* m = down_cast(morsel.get()); + auto owner_id = m->owner_id(); + ASSIGN_OR_RETURN(int64_t next_owner_id, _peek_sequence_id()); + _ticket_checker->enter(owner_id, next_owner_id != owner_id); + _current_sequence = owner_id; + return morsel; +} + +std::string BucketSequenceMorselQueue::name() const { + return fmt::format("partition_morsel_queue({})", _morsel_queue->name()); +} + +StatusOr BucketSequenceMorselQueue::ready_for_next() const { + if (_current_sequence < 0) { + return true; + } + + ASSIGN_OR_RETURN(int64_t next_sequence_id, _peek_sequence_id()); + if (next_sequence_id == _current_sequence) { + return true; + } + + if (_ticket_checker->are_all_left(_current_sequence)) { + return true; + } + + return false; +} + +StatusOr BucketSequenceMorselQueue::_peek_sequence_id() const { + int64_t next_owner_id = -1; + if (!_morsel_queue->empty()) { + ASSIGN_OR_RETURN(auto next_morsel, _morsel_queue->try_get()); + auto* next_scan_morsel = down_cast(next_morsel.get()); + next_owner_id = next_scan_morsel->owner_id(); + _morsel_queue->unget(std::move(next_morsel)); + } + return next_owner_id; +} + std::vector PhysicalSplitMorselQueue::olap_scan_ranges() const { return _convert_morsels_to_olap_scan_ranges(_morsels); } diff --git a/be/src/exec/pipeline/scan/morsel.h b/be/src/exec/pipeline/scan/morsel.h index 8e115332d4af9..39c802011a7d4 100644 --- a/be/src/exec/pipeline/scan/morsel.h +++ b/be/src/exec/pipeline/scan/morsel.h @@ -107,12 +107,15 @@ class ScanMorsel : public Morsel { ScanMorsel(int32_t plan_node_id, const TScanRange& scan_range) : Morsel(plan_node_id), _scan_range(std::make_unique(scan_range)) { if (_scan_range->__isset.internal_scan_range) { - _tablet_id = _scan_range->internal_scan_range.tablet_id; + _owner_id = _scan_range->internal_scan_range.tablet_id; auto str_version = _scan_range->internal_scan_range.version; _version = strtol(str_version.c_str(), nullptr, 10); + _owner_id = _scan_range->internal_scan_range.__isset.bucket_sequence + ? _scan_range->internal_scan_range.bucket_sequence + : _owner_id; } if (_scan_range->__isset.binlog_scan_range) { - _tablet_id = _scan_range->binlog_scan_range.tablet_id; + _owner_id = _scan_range->binlog_scan_range.tablet_id; } } @@ -126,12 +129,14 @@ class ScanMorsel : public Morsel { TInternalScanRange* get_olap_scan_range() { return &(_scan_range->internal_scan_range); } std::tuple get_lane_owner_and_version() const override { - return std::tuple{_tablet_id, _version}; + return std::tuple{_owner_id, _version}; } + int32_t owner_id() const { return _owner_id; } + private: std::unique_ptr _scan_range; - int64_t _tablet_id = 0; + int64_t _owner_id = 0; int64_t _version = 0; }; @@ -214,6 +219,29 @@ class IndividualMorselQueueFactory final : public MorselQueueFactory { const bool _could_local_shuffle; }; +class BucketSequenceMorselQueueFactory final : public MorselQueueFactory { +public: + BucketSequenceMorselQueueFactory(std::map&& queue_per_driver_seq, bool could_local_shuffle); + ~BucketSequenceMorselQueueFactory() override = default; + + MorselQueue* create(int driver_sequence) override { + DCHECK_LT(driver_sequence, _queue_per_driver_seq.size()); + return _queue_per_driver_seq[driver_sequence].get(); + } + + size_t size() const override { return _queue_per_driver_seq.size(); } + + size_t num_original_morsels() const override; + + bool is_shared() const override { return false; } + + bool could_local_shuffle() const override { return _could_local_shuffle; } + +private: + std::vector _queue_per_driver_seq; + const bool _could_local_shuffle; +}; + /// MorselQueue. class MorselQueue { public: @@ -225,6 +253,8 @@ class MorselQueue { virtual void set_key_ranges(const std::vector>& key_ranges) {} virtual void set_tablets(const std::vector& tablets) {} virtual void set_tablet_rowsets(const std::vector>& tablet_rowsets) {} + virtual void set_ticket_checker(const query_cache::TicketCheckerPtr& ticket_checker) {} + virtual bool could_attch_ticket_checker() { return false; } virtual size_t num_original_morsels() const = 0; virtual size_t max_degree_of_parallelism() const = 0; @@ -232,6 +262,7 @@ class MorselQueue { virtual StatusOr try_get() = 0; void unget(MorselPtr&& morsel); virtual std::string name() const = 0; + virtual StatusOr ready_for_next() const { return true; } protected: MorselPtr _unget_morsel = nullptr; @@ -264,6 +295,35 @@ class FixedMorselQueue final : public MorselQueue { std::vector> _tablet_rowsets; }; +class BucketSequenceMorselQueue : public MorselQueue { +public: + BucketSequenceMorselQueue(MorselQueuePtr&& morsel_queue); + std::vector olap_scan_ranges() const override; + + void set_tablet_rowsets(const std::vector>& tablet_rowsets) override { + _morsel_queue->set_tablet_rowsets(tablet_rowsets); + } + + void set_ticket_checker(const query_cache::TicketCheckerPtr& ticket_checker) override { + _ticket_checker = ticket_checker; + } + bool could_attch_ticket_checker() override { return true; } + + size_t num_original_morsels() const override { return _morsel_queue->num_original_morsels(); } + size_t max_degree_of_parallelism() const override { return _morsel_queue->max_degree_of_parallelism(); } + bool empty() const override; + StatusOr try_get() override; + std::string name() const override; + StatusOr ready_for_next() const override; + +private: + StatusOr _peek_sequence_id() const; + + int64_t _current_sequence = -1; + MorselQueuePtr _morsel_queue; + query_cache::TicketCheckerPtr _ticket_checker; +}; + class SplitMorselQueue : public MorselQueue { public: SplitMorselQueue(Morsels&& morsels, int64_t degree_of_parallelism, int64_t splitted_scan_rows) @@ -273,7 +333,10 @@ class SplitMorselQueue : public MorselQueue { _splitted_scan_rows(splitted_scan_rows) {} void set_tablets(const std::vector& tablets) override { _tablets = tablets; } - void set_ticket_checker(const query_cache::TicketCheckerPtr& ticket_checker) { _ticket_checker = ticket_checker; } + void set_ticket_checker(const query_cache::TicketCheckerPtr& ticket_checker) override { + _ticket_checker = ticket_checker; + } + bool could_attch_ticket_checker() override { return true; } protected: void _inc_num_splits(bool is_last) { diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index c9882c8926bba..95fae848d1458 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -17,6 +17,8 @@ #include #include "column/chunk.h" +#include "common/status.h" +#include "common/statusor.h" #include "exec/olap_scan_node.h" #include "exec/pipeline/limit_operator.h" #include "exec/pipeline/pipeline_builder.h" @@ -37,6 +39,7 @@ ScanOperator::ScanOperator(OperatorFactory* factory, int32_t id, int32_t driver_ : SourceOperator(factory, id, scan_node->name(), scan_node->id(), false, driver_sequence), _scan_node(scan_node), _dop(dop), + _output_chunk_by_bucket(scan_node->output_chunk_by_bucket()), _io_tasks_per_scan_operator(scan_node->io_tasks_per_scan_operator()), _chunk_source_profiles(_io_tasks_per_scan_operator), _is_io_task_running(_io_tasks_per_scan_operator), @@ -243,21 +246,21 @@ StatusOr ScanOperator::pull_chunk(RuntimeState* state) { if (res != nullptr) { begin_pull_chunk(res); // for query cache mechanism, we should emit EOS chunk when we receive the last chunk. - auto [tablet_id, is_eos] = _should_emit_eos(res); + auto [owner_id, is_eos] = _should_emit_eos(res); eval_runtime_bloom_filters(res.get()); - res->owner_info().set_owner_id(tablet_id, is_eos); + res->owner_info().set_owner_id(owner_id, is_eos); } return res; } std::tuple ScanOperator::_should_emit_eos(const ChunkPtr& chunk) { - auto tablet_id = chunk->owner_info().owner_id(); + auto owner_id = chunk->owner_info().owner_id(); auto is_last_chunk = chunk->owner_info().is_last_chunk(); if (is_last_chunk && _ticket_checker != nullptr) { - is_last_chunk = _ticket_checker->leave(tablet_id); + is_last_chunk = _ticket_checker->leave(owner_id); } - return {tablet_id, is_last_chunk}; + return {owner_id, is_last_chunk}; } int64_t ScanOperator::global_rf_wait_timeout_ns() const { @@ -307,24 +310,18 @@ Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) { size = std::min(size, total_cnt); // pick up new chunk source. - for (int i = 0; i < size; i++) { - int idx = to_sched[i]; - RETURN_IF_ERROR(_pickup_morsel(state, idx)); + ASSIGN_OR_RETURN(auto morsel_ready, _morsel_queue->ready_for_next()); + if (size > 0 && morsel_ready) { + for (int i = 0; i < size; i++) { + int idx = to_sched[i]; + RETURN_IF_ERROR(_pickup_morsel(state, idx)); + } } _peak_io_tasks_counter->set(_num_running_io_tasks); return Status::OK(); } -// this is a more efficient way to check if a weak_ptr has been initialized -// ref: https://stackoverflow.com/a/45507610 -// after compiler optimization, it generates far fewer instructions than std::weak_ptr::expired() and std::weak_ptr::lock() -// see: https://godbolt.org/z/16bWqqM5n -inline bool is_uninitialized(const std::weak_ptr& ptr) { - using wp = std::weak_ptr; - return !ptr.owner_before(wp{}) && !wp{}.owner_before(ptr); -} - void ScanOperator::_close_chunk_source_unlocked(RuntimeState* state, int chunk_source_index) { if (_chunk_sources[chunk_source_index] != nullptr) { _chunk_sources[chunk_source_index]->close(state); @@ -455,6 +452,10 @@ Status ScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_index) } }); + // if current morsel not ready for get next. we should wait current bucket finish. just return directly + ASSIGN_OR_RETURN(auto ready, _morsel_queue->ready_for_next()); + RETURN_IF(!ready, Status::OK()); + ASSIGN_OR_RETURN(auto morsel, _morsel_queue->try_get()); if (_lane_arbiter != nullptr) { diff --git a/be/src/exec/pipeline/scan/scan_operator.h b/be/src/exec/pipeline/scan/scan_operator.h index a3585437bc2ca..1277174251df1 100644 --- a/be/src/exec/pipeline/scan/scan_operator.h +++ b/be/src/exec/pipeline/scan/scan_operator.h @@ -79,6 +79,7 @@ class ScanOperator : public SourceOperator { void set_query_ctx(const QueryContextPtr& query_ctx); virtual int available_pickup_morsel_count() { return _io_tasks_per_scan_operator; } + bool output_chunk_by_bucket() const { return _output_chunk_by_bucket; } void begin_pull_chunk(const ChunkPtr& res) { _op_pull_chunks += 1; _op_pull_rows += res->num_rows(); @@ -139,6 +140,7 @@ class ScanOperator : public SourceOperator { protected: ScanNode* _scan_node = nullptr; const int32_t _dop; + const bool _output_chunk_by_bucket; const int _io_tasks_per_scan_operator; // ScanOperator may do parallel scan, so each _chunk_sources[i] needs to hold // a profile indenpendently, to be more specificly, _chunk_sources[i] will go through diff --git a/be/src/exec/query_cache/cache_operator.h b/be/src/exec/query_cache/cache_operator.h index e9c4b29a2a471..4e50bfd22b352 100644 --- a/be/src/exec/query_cache/cache_operator.h +++ b/be/src/exec/query_cache/cache_operator.h @@ -64,6 +64,7 @@ class CacheOperator final : public pipeline::Operator { _multilane_operators = std::move(multilane_operators); } void set_scan_operator(pipeline::OperatorRawPtr scan_operator) { _scan_operator = scan_operator; } + bool ignore_empty_eos() const override { return false; } private: void _update_probe_metrics(int64_t, const std::vector& chunks); diff --git a/be/src/exec/query_cache/multilane_operator.h b/be/src/exec/query_cache/multilane_operator.h index 32b636337dbfb..a994c5cdc6004 100644 --- a/be/src/exec/query_cache/multilane_operator.h +++ b/be/src/exec/query_cache/multilane_operator.h @@ -76,6 +76,7 @@ class MultilaneOperator final : public pipeline::Operator { pipeline::OperatorPtr get_internal_op(size_t i); void set_precondition_ready(starrocks::RuntimeState* state) override; + bool ignore_empty_eos() const override { return false; } private: StatusOr _pull_chunk_from_lane(RuntimeState* state, Lane& lane, bool passthrough_mode); diff --git a/be/src/exec/query_cache/ticket_checker.cpp b/be/src/exec/query_cache/ticket_checker.cpp index 56dd3a803b649..5a9777b3b767a 100644 --- a/be/src/exec/query_cache/ticket_checker.cpp +++ b/be/src/exec/query_cache/ticket_checker.cpp @@ -39,4 +39,15 @@ bool TicketChecker::are_all_ready(TicketIdType id) { return (ticket.data & ALL_READY_BIT) != 0L; } +bool TicketChecker::are_all_left(TicketIdType id) { + std::lock_guard require_lock(_lock); + auto it = _tickets.find(id); + DCHECK(it != _tickets.end()); + Ticket& ticket = it->second; + bool is_all_enter = (ticket.data & ALL_READY_BIT) == ALL_READY_BIT; + int64_t enter_count = ticket.data & ENTER_COUNT_BITS; + int64_t leave_count = (ticket.data & LEAVE_COUNT_BITS) >> LEAVE_COUNT_SHIFT; + return is_all_enter && (enter_count == leave_count); +} + } // namespace starrocks::query_cache \ No newline at end of file diff --git a/be/src/exec/query_cache/ticket_checker.h b/be/src/exec/query_cache/ticket_checker.h index 870359defb2f8..fab563215c345 100644 --- a/be/src/exec/query_cache/ticket_checker.h +++ b/be/src/exec/query_cache/ticket_checker.h @@ -62,6 +62,9 @@ class TicketChecker { // test if all_ready_bit is on, returning true means that morsel splitting is done. bool are_all_ready(TicketIdType id); + // if all id in check are ready. return true + bool are_all_left(TicketIdType id); + private: DISALLOW_COPY_AND_MOVE(TicketChecker); std::unordered_map _tickets; diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index 79e33d43815c6..9548ee8ce149e 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -140,8 +140,13 @@ StatusOr ScanNode::convert_scan_range_to_morsel queue_per_driver_seq.emplace(dop, std::move(queue)); } - return std::make_unique(std::move(queue_per_driver_seq), - /*could_local_shuffle*/ false); + if (output_chunk_by_bucket()) { + return std::make_unique(std::move(queue_per_driver_seq), + /*could_local_shuffle*/ false); + } else { + return std::make_unique(std::move(queue_per_driver_seq), + /*could_local_shuffle*/ false); + } } } diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index 60356643d053d..4b8b583da2e82 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -119,6 +119,7 @@ class ScanNode : public ExecNode { virtual int io_tasks_per_scan_operator() const { return _io_tasks_per_scan_operator; } virtual bool always_shared_scan() const { return false; } + virtual bool output_chunk_by_bucket() const { return false; } // TODO: support more share_scan strategy void enable_shared_scan(bool enable); diff --git a/be/src/exec/spill/input_stream.h b/be/src/exec/spill/input_stream.h index acb03f4bcec9d..fabd5bb221cac 100644 --- a/be/src/exec/spill/input_stream.h +++ b/be/src/exec/spill/input_stream.h @@ -68,6 +68,8 @@ class BlockGroup { StatusOr as_ordered_stream(RuntimeState* state, const SerdePtr& serde, Spiller* spiller, const SortExecExprs* sort_exprs, const SortDescs* sort_descs); + void clear() { _blocks.clear(); } + private: std::vector _blocks; }; diff --git a/be/src/exec/spill/spiller.cpp b/be/src/exec/spill/spiller.cpp index 9546d58c3ae77..2e8a1f14bf71d 100644 --- a/be/src/exec/spill/spiller.cpp +++ b/be/src/exec/spill/spiller.cpp @@ -114,6 +114,13 @@ void Spiller::update_spilled_task_status(Status&& st) { } } +Status Spiller::reset_state(RuntimeState* state) { + _spilled_append_rows = 0; + _restore_read_rows = 0; + _block_group->clear(); + return Status::OK(); +} + std::vector > Spiller::get_partition_spill_readers( const std::vector& partitions) { std::vector > res; diff --git a/be/src/exec/spill/spiller.h b/be/src/exec/spill/spiller.h index 82823247dae22..5e90e28b91771 100644 --- a/be/src/exec/spill/spiller.h +++ b/be/src/exec/spill/spiller.h @@ -192,6 +192,8 @@ class Spiller : public std::enable_shared_from_this { BlockManager* block_manager() { return _block_manager; } const ChunkBuilder& chunk_builder() { return _chunk_builder; } + Status reset_state(RuntimeState* state); + private: Status _acquire_input_stream(RuntimeState* state); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/AggregationNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/AggregationNode.java index 354caf2e8ea2c..dd97bb9edc1d9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/AggregationNode.java @@ -88,7 +88,8 @@ public class AggregationNode extends PlanNode { private String streamingPreaggregationMode = "auto"; private boolean useSortAgg = false; - + private boolean usePerBucketOptimize = false; + private boolean withLocalShuffle = false; // identicallyDistributed meanings the PlanNode above OlapScanNode are cases as follows: @@ -162,6 +163,15 @@ public void setUseSortAgg(boolean useSortAgg) { this.useSortAgg = useSortAgg; } + public void setUsePerBucketOptimize(boolean usePerBucketOptimize) { + this.usePerBucketOptimize = usePerBucketOptimize; + } + + public void disablePhysicalPropertyOptimize() { + setUseSortAgg(false); + setUsePerBucketOptimize(false); + } + @Override public void computeStats(Analyzer analyzer) { } @@ -223,6 +233,7 @@ protected void toThrift(TPlanNode msg) { msg.agg_node.setSql_aggregate_functions(sqlAggFuncBuilder.toString()); } msg.agg_node.setUse_sort_agg(useSortAgg); + msg.agg_node.setUse_per_bucket_optimize(usePerBucketOptimize); List groupingExprs = aggInfo.getGroupingExprs(); if (groupingExprs != null) { diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java index 2c0845c8b60a6..cc8edec79399f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java @@ -150,6 +150,8 @@ public class OlapScanNode extends ScanNode { private ArrayList scanTabletIds = Lists.newArrayList(); private boolean isFinalized = false; private boolean isSortedByKeyPerTablet = false; + private boolean isOutputChunkByBucket = false; + private Map tabletId2BucketSeq = Maps.newHashMap(); private List bucketExprs = Lists.newArrayList(); private List bucketColumns = Lists.newArrayList(); @@ -186,6 +188,15 @@ public void setIsSortedByKeyPerTablet(boolean isSortedByKeyPerTablet) { this.isSortedByKeyPerTablet = isSortedByKeyPerTablet; } + public void setIsOutputChunkByBucket(boolean isOutputChunkByBucket) { + this.isOutputChunkByBucket = isOutputChunkByBucket; + } + + public void disablePhysicalPropertyOptimize() { + setIsSortedByKeyPerTablet(false); + setIsOutputChunkByBucket(false); + } + public List getSelectedPartitionIds() { return selectedPartitionIds; } @@ -392,6 +403,9 @@ public List updateScanRangeLocations(List replicas = allQueryableReplicas; @@ -451,6 +465,9 @@ public void addScanRangeLocations(Partition partition, internalRange.setTablet_id(tabletId); internalRange.setPartition_id(partition.getId()); internalRange.setRow_count(tablet.getRowCount(0)); + if (isOutputChunkByBucket) { + internalRange.setBucket_sequence(tabletId2BucketSeq.get(tabletId)); + } // random shuffle List && only collect one copy List allQueryableReplicas = Lists.newArrayList(); @@ -832,6 +849,7 @@ protected void toThrift(TPlanNode msg) { msg.olap_scan_node.setUnused_output_column_name(unUsedOutputStringColumns); } msg.olap_scan_node.setSorted_by_keys_per_tablet(isSortedByKeyPerTablet); + msg.olap_scan_node.setOutput_chunk_by_bucket(isOutputChunkByBucket); if (!bucketExprs.isEmpty()) { msg.olap_scan_node.setBucket_exprs(Expr.treesToThrift(bucketExprs)); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java b/fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java index 2022e27f7ad33..ac1e54f96c0b9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java @@ -61,6 +61,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -724,4 +725,21 @@ public PlanNode getLeftMostNode() { return node; } +<<<<<<< HEAD +======= + public void reset() { + // Do nothing. + } + + public void disablePhysicalDistributionOptimize() { + forEachNode(planRoot, PlanNode::disablePhysicalPropertyOptimize); + } + + private void forEachNode(PlanNode root, Consumer consumer) { + consumer.accept(root); + for (PlanNode child : root.getChildren()) { + forEachNode(child, consumer); + } + } +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/PlanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/PlanNode.java index c5948a608ed1a..5e9095b9e0c5f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/PlanNode.java @@ -988,4 +988,9 @@ public List getOutputSlotIds(DescriptorTable descriptorTable) { // 3. HashJoinNode: slotIds of both sides of Join equal conditions in semi join and inner join. public void collectEquivRelation(FragmentNormalizer normalizer) { } + + // disable optimize depends on physical order + // eg: sortedStreamingAGG/ PerBucketCompute + public void disablePhysicalPropertyOptimize() { + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java index e3f7cd99b5531..15139f3d9cfd3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java @@ -535,7 +535,6 @@ private List> splitScanRangeParamByRowCount(List It firstly assigns scan ranges to workers, and then dispatches scan ranges, assigned to each worker, to fragment instances. + */ +public class LocalFragmentAssignmentStrategy implements FragmentAssignmentStrategy { + private static final Logger LOG = LogManager.getLogger(LocalFragmentAssignmentStrategy.class); + + private final ConnectContext connectContext; + private final WorkerProvider workerProvider; + private final boolean usePipeline; + private final boolean isLoadType; + + private final Set replicatedScanIds = Sets.newHashSet(); + + public LocalFragmentAssignmentStrategy(ConnectContext connectContext, WorkerProvider workerProvider, boolean usePipeline, + boolean isLoadType) { + this.connectContext = connectContext; + this.workerProvider = workerProvider; + this.usePipeline = usePipeline; + this.isLoadType = isLoadType; + } + + @Override + public void assignFragmentToWorker(ExecutionFragment execFragment) throws UserException { + for (ScanNode scanNode : execFragment.getScanNodes()) { + assignScanRangesToWorker(execFragment, scanNode); + } + + assignScanRangesToFragmentInstancePerWorker(execFragment); + + // The fragment which only contains scan nodes without scan ranges, + // such as SchemaScanNode, is assigned to an arbitrary worker. + if (execFragment.getInstances().isEmpty()) { + long workerId = workerProvider.selectNextWorker(); + ComputeNode worker = workerProvider.getWorkerById(workerId); + FragmentInstance instance = new FragmentInstance(worker, execFragment); + execFragment.addInstance(instance); + } + } + + private void assignScanRangesToWorker(ExecutionFragment execFragment, ScanNode scanNode) throws UserException { + BackendSelector backendSelector = BackendSelectorFactory.create( + scanNode, isLoadType, execFragment, workerProvider, connectContext, replicatedScanIds); + + backendSelector.computeScanRangeAssignment(); + + if (LOG.isDebugEnabled()) { + LOG.debug(execFragment.getScanRangeAssignment().toDebugString()); + } + } + + private void assignScanRangesToFragmentInstancePerWorker(ExecutionFragment execFragment) { + ColocatedBackendSelector.Assignment colocatedAssignment = execFragment.getColocatedAssignment(); + boolean hasColocate = execFragment.isColocated() + && colocatedAssignment != null && !colocatedAssignment.getSeqToWorkerId().isEmpty(); + boolean hasBucketShuffle = execFragment.isLocalBucketShuffleJoin() && colocatedAssignment != null; + + if (hasColocate || hasBucketShuffle) { + assignScanRangesToColocateFragmentInstancePerWorker(execFragment, + colocatedAssignment.getSeqToWorkerId(), + colocatedAssignment.getSeqToScanRange()); + } else { + assignScanRangesToNormalFragmentInstancePerWorker(execFragment); + } + } + + /** + * This strategy assigns buckets to each driver sequence to avoid local shuffle. + * If the number of buckets assigned to a fragment instance is less than pipelineDop, + * pipelineDop will be set to num_buckets, which will reduce the degree of operator parallelism. + * Therefore, when there are few buckets (<=pipeline_dop/2), insert local shuffle instead of using this strategy + * to improve the degree of parallelism. + * + * @param scanRanges The buckets assigned to a fragment instance. + * @param pipelineDop The expected pipelineDop. + * @return Whether using the strategy of assigning scanRanges to each driver sequence. + */ + private boolean enableAssignScanRangesPerDriverSeq(List scanRanges, int pipelineDop) { + boolean enableTabletInternalParallel = + connectContext != null && connectContext.getSessionVariable().isEnableTabletInternalParallel(); + return !enableTabletInternalParallel || scanRanges.size() > pipelineDop / 2; + } + + private boolean enableAssignScanRangesPerDriverSeq(PlanFragment fragment, List scanRanges) { + if (!usePipeline) { + return false; + } + + if (fragment.isForceAssignScanRangesPerDriverSeq()) { + return true; + } + + return fragment.isAssignScanRangesPerDriverSeq() && + enableAssignScanRangesPerDriverSeq(scanRanges, fragment.getPipelineDop()); + } + + private boolean needAddScanRanges(Set visitedReplicatedScanIds, Integer scanId) { + if (!replicatedScanIds.contains(scanId)) { + return true; + } + return visitedReplicatedScanIds.add(scanId); + } + + private void assignScanRangesToColocateFragmentInstancePerWorker( + ExecutionFragment execFragment, + Map bucketSeqToWorkerId, + ColocatedBackendSelector.BucketSeqToScanRange bucketSeqToScanRange) { + final PlanFragment fragment = execFragment.getPlanFragment(); + final int parallelExecInstanceNum = fragment.getParallelExecNum(); + final int pipelineDop = fragment.getPipelineDop(); + + // 1. count each node in one fragment should scan how many tablet, gather them in one list + Map> workerIdToBucketSeqs = bucketSeqToWorkerId.entrySet().stream() + .collect(Collectors.groupingBy( + Map.Entry::getValue, + Collectors.mapping(Map.Entry::getKey, Collectors.toList()) + )); + + boolean assignPerDriverSeq = usePipeline && workerIdToBucketSeqs.values().stream() + .allMatch(bucketSeqs -> enableAssignScanRangesPerDriverSeq(bucketSeqs, pipelineDop)); + + if (!assignPerDriverSeq) { + // these optimize depend on assignPerDriverSeq. + fragment.disablePhysicalDistributionOptimize(); + } + + workerIdToBucketSeqs.forEach((workerId, bucketSeqsOfWorker) -> { + ComputeNode worker = workerProvider.getWorkerById(workerId); + + // 2. split how many scanRange one instance should scan + int expectedInstanceNum = Math.max(1, parallelExecInstanceNum); + List> bucketSeqsPerInstance = ListUtil.splitBySize(bucketSeqsOfWorker, expectedInstanceNum); + + // 3.construct instanceExecParam add the scanRange should be scanned by instance + bucketSeqsPerInstance.forEach(bucketSeqsOfInstance -> { + FragmentInstance instance = new FragmentInstance(worker, execFragment); + execFragment.addInstance(instance); + + // record each instance replicate scan id in set, to avoid add replicate scan range repeatedly when they are in different buckets + Set instanceReplicatedScanIds = new HashSet<>(); + + if (!assignPerDriverSeq) { + bucketSeqsOfInstance.forEach(bucketSeq -> { + instance.addBucketSeq(bucketSeq); + bucketSeqToScanRange.get(bucketSeq).forEach((scanId, scanRanges) -> { + if (needAddScanRanges(instanceReplicatedScanIds, scanId)) { + instance.addScanRanges(scanId, scanRanges); + } + }); + }); + } else { + int expectedDop = Math.max(1, pipelineDop); + List> bucketSeqsPerDriverSeq = ListUtil.splitBySize(bucketSeqsOfInstance, expectedDop); + + instance.setPipelineDop(bucketSeqsPerDriverSeq.size()); + + for (int driverSeq = 0; driverSeq < bucketSeqsPerDriverSeq.size(); driverSeq++) { + int finalDriverSeq = driverSeq; + bucketSeqsPerDriverSeq.get(driverSeq).forEach(bucketSeq -> { + instance.addBucketSeqAndDriverSeq(bucketSeq, finalDriverSeq); + bucketSeqToScanRange.get(bucketSeq).forEach((scanId, scanRanges) -> { + if (needAddScanRanges(instanceReplicatedScanIds, scanId)) { + instance.addScanRanges(scanId, finalDriverSeq, scanRanges); + } + }); + }); + } + + instance.paddingScanRanges(); + } + }); + }); + } + + private void assignScanRangesToNormalFragmentInstancePerWorker(ExecutionFragment execFragment) { + final PlanFragment fragment = execFragment.getPlanFragment(); + final int parallelExecInstanceNum = fragment.getParallelExecNum(); + final int pipelineDop = fragment.getPipelineDop(); + + execFragment.getScanRangeAssignment().forEach((workerId, scanRangesPerWorker) -> { + // 1. Handle normal scan node firstly + scanRangesPerWorker.forEach((scanId, scanRangesOfNode) -> { + if (replicatedScanIds.contains(scanId)) { + return; + } + + int expectedInstanceNum = Math.max(1, parallelExecInstanceNum); + List> scanRangesPerInstance = ListUtil.splitBySize(scanRangesOfNode, expectedInstanceNum); + + for (List scanRanges : scanRangesPerInstance) { + FragmentInstance instance = new FragmentInstance(workerProvider.getWorkerById(workerId), execFragment); + execFragment.addInstance(instance); + + if (!enableAssignScanRangesPerDriverSeq(fragment, scanRanges)) { + instance.addScanRanges(scanId, scanRanges); + } else { + int expectedDop = Math.max(1, Math.min(pipelineDop, scanRanges.size())); + List> scanRangesPerDriverSeq; + if (Config.enable_schedule_insert_query_by_row_count && isLoadType + && !scanRanges.isEmpty() + && scanRanges.get(0).getScan_range().isSetInternal_scan_range()) { + scanRangesPerDriverSeq = splitScanRangeParamByRowCount(scanRanges, expectedDop); + } else { + scanRangesPerDriverSeq = ListUtil.splitBySize(scanRanges, expectedDop); + } + + if (fragment.isForceAssignScanRangesPerDriverSeq() && scanRangesPerDriverSeq.size() != pipelineDop) { + fragment.setPipelineDop(scanRangesPerDriverSeq.size()); + } + instance.setPipelineDop(scanRangesPerDriverSeq.size()); + + for (int driverSeq = 0; driverSeq < scanRangesPerDriverSeq.size(); ++driverSeq) { + instance.addScanRanges(scanId, driverSeq, scanRangesPerDriverSeq.get(driverSeq)); + } + instance.paddingScanRanges(); + } + } + }); + + // 2. Handle replicated scan node, if needed + boolean isReplicated = execFragment.isReplicated(); + if (isReplicated) { + scanRangesPerWorker.forEach((scanId, scanRangesPerNode) -> { + if (!replicatedScanIds.contains(scanId)) { + return; + } + + for (FragmentInstance instance : execFragment.getInstances()) { + instance.addScanRanges(scanId, scanRangesPerNode); + } + }); + } + }); + } + + /** + * Split scan range params into groupNum groups by each group's row count. + */ + private static List> splitScanRangeParamByRowCount(List scanRangeParams, + int groupNum) { + List> result = new ArrayList<>(groupNum); + for (int i = 0; i < groupNum; i++) { + result.add(new ArrayList<>()); + } + long[] dataSizePerGroup = new long[groupNum]; + for (TScanRangeParams scanRangeParam : scanRangeParams) { + int minIndex = 0; + long minDataSize = dataSizePerGroup[0]; + for (int i = 1; i < groupNum; i++) { + if (dataSizePerGroup[i] < minDataSize) { + minIndex = i; + minDataSize = dataSizePerGroup[i]; + } + } + dataSizePerGroup[minIndex] += Math.max(1, scanRangeParam.getScan_range().getInternal_scan_range().getRow_count()); + result.get(minIndex).add(scanRangeParam); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("dataSizePerGroup: {}", dataSizePerGroup); + } + + return result; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java index 32f376fe7e393..397da25825a9f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java @@ -65,6 +65,7 @@ import com.starrocks.sql.optimizer.rule.tree.CloneDuplicateColRefRule; import com.starrocks.sql.optimizer.rule.tree.ExchangeSortToMergeRule; import com.starrocks.sql.optimizer.rule.tree.ExtractAggregateColumn; +import com.starrocks.sql.optimizer.rule.tree.PhysicalDistributionAggOptRule; import com.starrocks.sql.optimizer.rule.tree.PreAggregateTurnOnRule; import com.starrocks.sql.optimizer.rule.tree.PredicateReorderRule; import com.starrocks.sql.optimizer.rule.tree.PruneAggregateNodeRule; @@ -73,7 +74,6 @@ import com.starrocks.sql.optimizer.rule.tree.PushDownAggregateRule; import com.starrocks.sql.optimizer.rule.tree.PushDownDistinctAggregateRule; import com.starrocks.sql.optimizer.rule.tree.ScalarOperatorsReuseRule; -import com.starrocks.sql.optimizer.rule.tree.UseSortAggregateRule; import com.starrocks.sql.optimizer.rule.tree.prunesubfield.PruneSubfieldRule; import com.starrocks.sql.optimizer.rule.tree.prunesubfield.PushDownSubfieldRule; import com.starrocks.sql.optimizer.task.OptimizeGroupTask; @@ -617,7 +617,7 @@ private OptExpression physicalRuleRewrite(TaskContext rootTaskContext, OptExpres result = new ExchangeSortToMergeRule().rewrite(result, rootTaskContext); result = new PruneAggregateNodeRule().rewrite(result, rootTaskContext); result = new PruneShuffleColumnRule().rewrite(result, rootTaskContext); - result = new UseSortAggregateRule().rewrite(result, rootTaskContext); + result = new PhysicalDistributionAggOptRule().rewrite(result, rootTaskContext); result = new AddDecodeNodeForDictStringRule().rewrite(result, rootTaskContext); // This rule should be last result = new ScalarOperatorsReuseRule().rewrite(result, rootTaskContext); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalHashAggregateOperator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalHashAggregateOperator.java index 56c737c69dbb8..b696f5f3092fe 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalHashAggregateOperator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalHashAggregateOperator.java @@ -73,6 +73,8 @@ public class PhysicalHashAggregateOperator extends PhysicalOperator { private boolean useSortAgg = false; + private boolean usePerBucketOptmize = false; + private DataSkewInfo distinctColumnDataSkew = null; public PhysicalHashAggregateOperator(AggType type, List groupBys, @@ -168,6 +170,14 @@ public void setUseSortAgg(boolean useSortAgg) { this.useSortAgg = useSortAgg; } + public boolean isUsePerBucketOptmize() { + return usePerBucketOptmize; + } + + public void setUsePerBucketOptmize(boolean usePerBucketOptmize) { + this.usePerBucketOptmize = usePerBucketOptmize; + } + public void setDistinctColumnDataSkew(DataSkewInfo distinctColumnDataSkew) { this.distinctColumnDataSkew = distinctColumnDataSkew; } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalOlapScanOperator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalOlapScanOperator.java index ecb063169098d..a8a28545e1e9c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalOlapScanOperator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalOlapScanOperator.java @@ -46,6 +46,7 @@ public class PhysicalOlapScanOperator extends PhysicalScanOperator { private boolean isPreAggregation; private String turnOffReason; protected boolean needSortedByKeyPerTablet = false; + protected boolean needOutputChunkByBucket = false; private boolean usePkIndex = false; @@ -142,10 +143,18 @@ public boolean needSortedByKeyPerTablet() { return needSortedByKeyPerTablet; } + public boolean needOutputChunkByBucket() { + return needOutputChunkByBucket; + } + public void setNeedSortedByKeyPerTablet(boolean needSortedByKeyPerTablet) { this.needSortedByKeyPerTablet = needSortedByKeyPerTablet; } + public void setNeedOutputChunkByBucket(boolean needOutputChunkByBucket) { + this.needOutputChunkByBucket = needOutputChunkByBucket; + } + public boolean isUsePkIndex() { return usePkIndex; } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/AddDecodeNodeForDictStringRule.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/AddDecodeNodeForDictStringRule.java index c3e3a9eff134a..be3c447a2c403 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/AddDecodeNodeForDictStringRule.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/AddDecodeNodeForDictStringRule.java @@ -469,6 +469,7 @@ public OptExpression visitPhysicalOlapScan(OptExpression optExpression, DecodeCo // There need to set right output columns newOlapScan.setOutputColumns(newOutputColumns); newOlapScan.setNeedSortedByKeyPerTablet(scanOperator.needSortedByKeyPerTablet()); + newOlapScan.setNeedOutputChunkByBucket(scanOperator.needOutputChunkByBucket()); OptExpression.Builder builder = OptExpression.buildWithOpAndInputs(newOlapScan, Lists.newArrayList()); builder.setLogicalProperty(rewriteLogicProperty(optExpression.getLogicalProperty(), @@ -751,6 +752,7 @@ private PhysicalHashAggregateOperator rewriteAggOperator(PhysicalHashAggregateOp aggOperator.getPredicate(), aggOperator.getProjection()); newHashAggregator.setMergedLocalAgg(aggOperator.isMergedLocalAgg()); newHashAggregator.setUseSortAgg(aggOperator.isUseSortAgg()); + newHashAggregator.setUsePerBucketOptmize(aggOperator.isUsePerBucketOptmize()); return newHashAggregator; } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PhysicalDistributionAggOptRule.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PhysicalDistributionAggOptRule.java new file mode 100644 index 0000000000000..1f4b6b41425fe --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/PhysicalDistributionAggOptRule.java @@ -0,0 +1,130 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.optimizer.rule.tree; + +import com.starrocks.catalog.Column; +import com.starrocks.catalog.OlapTable; +import com.starrocks.qe.ConnectContext; +import com.starrocks.sql.optimizer.OptExpression; +import com.starrocks.sql.optimizer.OptExpressionVisitor; +import com.starrocks.sql.optimizer.operator.OperatorType; +import com.starrocks.sql.optimizer.operator.physical.PhysicalHashAggregateOperator; +import com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator; +import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; +import com.starrocks.sql.optimizer.task.TaskContext; + +import java.util.List; +import java.util.stream.Collectors; + +public class PhysicalDistributionAggOptRule implements TreeRewriteRule { + @Override + public OptExpression rewrite(OptExpression root, TaskContext taskContext) { + if (ConnectContext.get().getSessionVariable().isEnableQueryCache()) { + return root; + } + if (ConnectContext.get().getSessionVariable().isEnableSortAggregate()) { + root.getOp().accept(new UseSortAGGRule(), root, null); + return root; + } + if (ConnectContext.get().getSessionVariable().isEnablePerBucketComputeOptimize()) { + root.getOp().accept(new UsePerBucketOptimizeRule(), root, null); + return root; + } + return root; + } + + private static class NoopVisitor extends OptExpressionVisitor { + @Override + public Void visit(OptExpression optExpression, Void context) { + for (OptExpression opt : optExpression.getInputs()) { + opt.getOp().accept(this, opt, context); + } + return null; + } + } + + private static class UsePerBucketOptimizeRule extends NoopVisitor { + @Override + public Void visitPhysicalHashAggregate(OptExpression optExpression, Void context) { + if (optExpression.getInputs().get(0).getOp().getOpType() != OperatorType.PHYSICAL_OLAP_SCAN) { + return visit(optExpression, context); + } + + PhysicalOlapScanOperator scan = (PhysicalOlapScanOperator) optExpression.getInputs().get(0).getOp(); + PhysicalHashAggregateOperator agg = (PhysicalHashAggregateOperator) optExpression.getOp(); + + // Now we only support one-stage AGG for per-bucket optimize + if (!agg.getType().isGlobal() || agg.getGroupBys().isEmpty()) { + return null; + } + + agg.setUsePerBucketOptmize(true); + scan.setNeedOutputChunkByBucket(true); + return null; + } + } + + private static class UseSortAGGRule extends NoopVisitor { + @Override + public Void visitPhysicalHashAggregate(OptExpression optExpression, Void context) { + if (optExpression.getInputs().get(0).getOp().getOpType() != OperatorType.PHYSICAL_OLAP_SCAN) { + return visit(optExpression, context); + } + + PhysicalOlapScanOperator scan = (PhysicalOlapScanOperator) optExpression.getInputs().get(0).getOp(); + PhysicalHashAggregateOperator agg = (PhysicalHashAggregateOperator) optExpression.getOp(); + + // Now we only support one-stage AGG + // TODO: support multi-stage AGG + if (!agg.getType().isGlobal() || agg.getGroupBys().isEmpty()) { + return null; + } + + // the same key in multi partition are not in the same tablet + if (scan.getSelectedPartitionId().size() > 1) { + return null; + } + + for (ColumnRefOperator groupBy : agg.getGroupBys()) { + if (!scan.getColRefToColumnMetaMap().containsKey(groupBy)) { + return null; + } + + if (!scan.getColRefToColumnMetaMap().get(groupBy).isKey()) { + return null; + } + } + + List nonKeyGroupBys = + agg.getGroupBys().stream().map(s -> scan.getColRefToColumnMetaMap().get(s)).collect( + Collectors.toList()); + + for (Column column : ((OlapTable) scan.getTable()).getSchemaByIndexId(scan.getSelectedIndexId())) { + if (!nonKeyGroupBys.contains(column)) { + break; + } + nonKeyGroupBys.remove(column); + } + + if (nonKeyGroupBys.isEmpty()) { + agg.setUseSortAgg(true); + scan.setNeedSortedByKeyPerTablet(true); + } + + return null; + } + } + +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/UseSortAggregateRule.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/UseSortAggregateRule.java deleted file mode 100644 index d6c2e727b6c13..0000000000000 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/tree/UseSortAggregateRule.java +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - - -package com.starrocks.sql.optimizer.rule.tree; - -import com.starrocks.catalog.Column; -import com.starrocks.catalog.OlapTable; -import com.starrocks.qe.ConnectContext; -import com.starrocks.sql.optimizer.OptExpression; -import com.starrocks.sql.optimizer.OptExpressionVisitor; -import com.starrocks.sql.optimizer.operator.OperatorType; -import com.starrocks.sql.optimizer.operator.physical.PhysicalHashAggregateOperator; -import com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator; -import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; -import com.starrocks.sql.optimizer.task.TaskContext; - -import java.util.List; -import java.util.stream.Collectors; - -public class UseSortAggregateRule extends OptExpressionVisitor implements TreeRewriteRule { - @Override - public OptExpression rewrite(OptExpression root, TaskContext taskContext) { - if (ConnectContext.get().getSessionVariable().isEnableQueryCache()) { - return root; - } - if (!ConnectContext.get().getSessionVariable().isEnableSortAggregate()) { - return root; - } - root.getOp().accept(this, root, null); - return root; - } - - @Override - public Void visit(OptExpression optExpression, Void context) { - for (OptExpression opt : optExpression.getInputs()) { - opt.getOp().accept(this, opt, context); - } - - return null; - } - - @Override - public Void visitPhysicalHashAggregate(OptExpression optExpression, Void context) { - if (optExpression.getInputs().get(0).getOp().getOpType() != OperatorType.PHYSICAL_OLAP_SCAN) { - return visit(optExpression, context); - } - - PhysicalOlapScanOperator scan = (PhysicalOlapScanOperator) optExpression.getInputs().get(0).getOp(); - - PhysicalHashAggregateOperator agg = (PhysicalHashAggregateOperator) optExpression.getOp(); - - // Now we only support one-stage AGG - // TODO: support multi-stage AGG - if (!agg.getType().isGlobal() || agg.getGroupBys().isEmpty()) { - return null; - } - - // the same key in multi partition are not in the same tablet - if (scan.getSelectedPartitionId().size() > 1) { - return null; - } - - for (ColumnRefOperator groupBy : agg.getGroupBys()) { - if (!scan.getColRefToColumnMetaMap().containsKey(groupBy)) { - return null; - } - - if (!scan.getColRefToColumnMetaMap().get(groupBy).isKey()) { - return null; - } - } - - List nonKeyGroupBys = agg.getGroupBys().stream().map(s -> scan.getColRefToColumnMetaMap().get(s)).collect( - Collectors.toList()); - - for (Column column : ((OlapTable) scan.getTable()).getSchemaByIndexId(scan.getSelectedIndexId())) { - if (!nonKeyGroupBys.contains(column)) { - break; - } - nonKeyGroupBys.remove(column); - } - - if (nonKeyGroupBys.isEmpty()) { - agg.setUseSortAgg(true); - scan.setNeedSortedByKeyPerTablet(true); - } - - return null; - } -} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java index 24f73ed9fbf2e..0c144550851ce 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java @@ -775,7 +775,15 @@ public PlanFragment visitPhysicalOlapScan(OptExpression optExpr, ExecPlan contex OlapScanNode scanNode = new OlapScanNode(context.getNextNodeId(), tupleDescriptor, "OlapScanNode"); scanNode.setLimit(node.getLimit()); scanNode.computeStatistics(optExpr.getStatistics()); +<<<<<<< HEAD scanNode.setScanOptimzeOption(node.getScanOptimzeOption()); +======= + scanNode.setCanUseAnyColumn(node.getCanUseAnyColumn()); + scanNode.setCanUseMinMaxCountOpt(node.getCanUseMinMaxCountOpt()); + scanNode.setIsSortedByKeyPerTablet(node.needSortedByKeyPerTablet()); + scanNode.setIsOutputChunkByBucket(node.needOutputChunkByBucket()); + +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) // set tablet try { scanNode.updateScanInfo(node.getSelectedPartitionId(), @@ -872,7 +880,6 @@ public PlanFragment visitPhysicalOlapScan(OptExpression optExpr, ExecPlan contex // set unused output columns setUnUsedOutputColumns(node, scanNode, predicates, referenceTable); - scanNode.setIsSortedByKeyPerTablet(node.needSortedByKeyPerTablet()); // set isPreAggregation scanNode.setIsPreAggregation(node.isPreAggregation(), node.getTurnOffReason()); @@ -1915,6 +1922,7 @@ public PlanFragment visitPhysicalHashAggregate(OptExpression optExpr, ExecPlan c } aggregationNode.setUseSortAgg(node.isUseSortAgg()); + aggregationNode.setUsePerBucketOptimize(node.isUsePerBucketOptmize()); aggregationNode.setStreamingPreaggregationMode(node.getNeededPreaggregationMode()); aggregationNode.setHasNullableGenerateChild(); aggregationNode.computeStatistics(optExpr.getStatistics()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6888c53baa130..24f44f4bff498 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -129,6 +129,8 @@ struct TInternalScanRange { // Allow this query to cache remote data on local disks or not. // Only the cloud native tablet will respect this field. 12: optional bool fill_data_cache = true; + // used for per-bucket compute optimize + 13: optional i32 bucket_sequence } enum TFileFormatType { @@ -500,6 +502,11 @@ struct TOlapScanNode { 28: optional i32 max_parallel_scan_instance_num 29: optional list column_access_paths 30: optional bool use_pk_index +<<<<<<< HEAD +======= + 31: required list columns_desc + 32: optional bool output_chunk_by_bucket +>>>>>>> 37b2b0c2e8 ([Enhancement] support per bucket optimize for colocate aggregate (#29252)) } struct TJDBCScanNode { @@ -722,6 +729,8 @@ struct TAggregationNode { 26: optional bool interpolate_passthrough = false 27: optional bool use_sort_agg + + 28: optional bool use_per_bucket_optimize } struct TRepeatNode {