diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 8c04963e18cf10..1edf8d7527fac5 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -183,9 +183,9 @@ set(EXEC_FILES pipeline/scan/meta_scan_prepare_operator.cpp pipeline/scan/olap_meta_scan_prepare_operator.cpp pipeline/scan/lake_meta_scan_prepare_operator.cpp - pipeline/scan/olap_schema_chunk_source.cpp - pipeline/scan/olap_schema_scan_operator.cpp - pipeline/scan/olap_schema_scan_context.cpp + pipeline/scan/schema_chunk_source.cpp + pipeline/scan/schema_scan_operator.cpp + pipeline/scan/schema_scan_context.cpp pipeline/sink/iceberg_table_sink_operator.cpp pipeline/scan/morsel.cpp pipeline/scan/chunk_buffer_limiter.cpp diff --git a/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp b/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp deleted file mode 100644 index f07671da5265f0..00000000000000 --- a/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "exec/pipeline/scan/olap_schema_scan_operator.h" - -#include - -#include "exec/pipeline/pipeline_builder.h" -#include "exec/pipeline/scan/balanced_chunk_buffer.h" -#include "exec/pipeline/scan/olap_schema_chunk_source.h" -#include "exec/pipeline/scan/scan_operator.h" -#include "gen_cpp/Types_types.h" - -namespace starrocks::pipeline { - -OlapSchemaScanOperatorFactory::OlapSchemaScanOperatorFactory(int32_t id, ScanNode* schema_scan_node, size_t dop, - const TPlanNode& t_node, - ChunkBufferLimiterPtr buffer_limiter) - : ScanOperatorFactory(id, schema_scan_node), - _chunk_buffer(BalanceStrategy::kDirect, dop, std::move(buffer_limiter)) { - _ctx = std::make_shared(t_node, _chunk_buffer); -} - -Status OlapSchemaScanOperatorFactory::do_prepare(RuntimeState* state) { - return _ctx->prepare(state); -} - -void OlapSchemaScanOperatorFactory::do_close(RuntimeState* state) {} - -OperatorPtr OlapSchemaScanOperatorFactory::do_create(int32_t dop, int32_t driver_sequence) { - return std::make_shared(this, _id, driver_sequence, dop, _scan_node, _ctx); -} - -OlapSchemaScanOperator::OlapSchemaScanOperator(OperatorFactory* factory, int32_t id, int32_t driver_sequence, - int32_t dop, ScanNode* scan_node, OlapSchemaScanContextPtr ctx) - : ScanOperator(factory, id, driver_sequence, dop, scan_node), _ctx(std::move(ctx)) {} - -OlapSchemaScanOperator::~OlapSchemaScanOperator() = default; - -Status OlapSchemaScanOperator::do_prepare(RuntimeState* state) { - return Status::OK(); -} - -void OlapSchemaScanOperator::do_close(RuntimeState* state) {} - -ChunkSourcePtr OlapSchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { - return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), - std::move(morsel), _ctx); -} - -ChunkPtr OlapSchemaScanOperator::get_chunk_from_buffer() { - ChunkPtr chunk = nullptr; - if (_ctx->get_chunk_buffer().try_get(_driver_sequence, &chunk)) { - return chunk; - } - return nullptr; -} - -size_t OlapSchemaScanOperator::num_buffered_chunks() const { - return _ctx->get_chunk_buffer().size(_driver_sequence); -} - -size_t OlapSchemaScanOperator::buffer_size() const { - return _ctx->get_chunk_buffer().limiter()->size(); -} - -size_t OlapSchemaScanOperator::buffer_capacity() const { - return _ctx->get_chunk_buffer().limiter()->capacity(); -} - -size_t OlapSchemaScanOperator::default_buffer_capacity() const { - return _ctx->get_chunk_buffer().limiter()->default_capacity(); -} - -ChunkBufferTokenPtr OlapSchemaScanOperator::pin_chunk(int num_chunks) { - return _ctx->get_chunk_buffer().limiter()->pin(num_chunks); -} - -bool OlapSchemaScanOperator::is_buffer_full() const { - return _ctx->get_chunk_buffer().limiter()->is_full(); -} - -void OlapSchemaScanOperator::set_buffer_finished() { - _ctx->get_chunk_buffer().set_finished(_driver_sequence); -} - -} // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp b/be/src/exec/pipeline/scan/schema_chunk_source.cpp similarity index 90% rename from be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp rename to be/src/exec/pipeline/scan/schema_chunk_source.cpp index 79584fd7dd267b..e725376889965b 100644 --- a/be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp +++ b/be/src/exec/pipeline/scan/schema_chunk_source.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "exec/pipeline/scan/olap_schema_chunk_source.h" +#include "exec/pipeline/scan/schema_chunk_source.h" #include @@ -21,13 +21,13 @@ namespace starrocks::pipeline { -OlapSchemaChunkSource::OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, - const OlapSchemaScanContextPtr& ctx) +SchemaChunkSource::SchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + const SchemaScanContextPtr& ctx) : ChunkSource(op, runtime_profile, std::move(morsel), ctx->get_chunk_buffer()), _ctx(ctx) {} -OlapSchemaChunkSource::~OlapSchemaChunkSource() = default; +SchemaChunkSource::~SchemaChunkSource() = default; -Status OlapSchemaChunkSource::prepare(RuntimeState* state) { +Status SchemaChunkSource::prepare(RuntimeState* state) { RETURN_IF_ERROR(ChunkSource::prepare(state)); _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_ctx->tuple_id()); if (_dest_tuple_desc == nullptr) { @@ -84,9 +84,9 @@ Status OlapSchemaChunkSource::prepare(RuntimeState* state) { return _schema_scanner->start(state); } -void OlapSchemaChunkSource::close(RuntimeState* state) {} +void SchemaChunkSource::close(RuntimeState* state) {} -Status OlapSchemaChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) { +Status SchemaChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) { const std::vector& src_slot_descs = _schema_scanner->get_slot_descs(); const std::vector& dest_slot_descs = _dest_tuple_desc->slots(); @@ -162,8 +162,7 @@ Status OlapSchemaChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) return Status::OK(); } -const workgroup::WorkGroupScanSchedEntity* OlapSchemaChunkSource::_scan_sched_entity( - const workgroup::WorkGroup* wg) const { +const workgroup::WorkGroupScanSchedEntity* SchemaChunkSource::_scan_sched_entity(const workgroup::WorkGroup* wg) const { DCHECK(wg != nullptr); return wg->scan_sched_entity(); } diff --git a/be/src/exec/pipeline/scan/olap_schema_chunk_source.h b/be/src/exec/pipeline/scan/schema_chunk_source.h similarity index 84% rename from be/src/exec/pipeline/scan/olap_schema_chunk_source.h rename to be/src/exec/pipeline/scan/schema_chunk_source.h index 657a9df7a8a765..3b5be0c68cef53 100644 --- a/be/src/exec/pipeline/scan/olap_schema_chunk_source.h +++ b/be/src/exec/pipeline/scan/schema_chunk_source.h @@ -17,8 +17,8 @@ #include "column/vectorized_fwd.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" #include "exec/pipeline/scan/chunk_source.h" -#include "exec/pipeline/scan/olap_schema_scan_context.h" #include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/scan/schema_scan_context.h" #include "runtime/runtime_state.h" #include "storage/chunk_helper.h" @@ -28,12 +28,12 @@ class SchemaScanner; namespace pipeline { -class OlapSchemaChunkSource final : public ChunkSource { +class SchemaChunkSource final : public ChunkSource { public: - OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, - const OlapSchemaScanContextPtr& ctx); + SchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + const SchemaScanContextPtr& ctx); - ~OlapSchemaChunkSource() override; + ~SchemaChunkSource() override; Status prepare(RuntimeState* state) override; @@ -47,7 +47,7 @@ class OlapSchemaChunkSource final : public ChunkSource { const TupleDescriptor* _dest_tuple_desc; std::unique_ptr _schema_scanner; - OlapSchemaScanContextPtr _ctx; + SchemaScanContextPtr _ctx; RuntimeProfile::Counter* _filter_timer = nullptr; diff --git a/be/src/exec/pipeline/scan/olap_schema_scan_context.cpp b/be/src/exec/pipeline/scan/schema_scan_context.cpp similarity index 96% rename from be/src/exec/pipeline/scan/olap_schema_scan_context.cpp rename to be/src/exec/pipeline/scan/schema_scan_context.cpp index ce403ea0507d34..61e1d9bc8ff335 100644 --- a/be/src/exec/pipeline/scan/olap_schema_scan_context.cpp +++ b/be/src/exec/pipeline/scan/schema_scan_context.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "exec/pipeline/scan/olap_schema_scan_context.h" +#include "exec/pipeline/scan/schema_scan_context.h" #include @@ -21,7 +21,7 @@ namespace starrocks::pipeline { -Status OlapSchemaScanContext::prepare(RuntimeState* state) { +Status SchemaScanContext::prepare(RuntimeState* state) { RETURN_IF_ERROR(Expr::create_expr_trees(&_obj_pool, _tnode.conjuncts, &_conjunct_ctxs, state)); RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state)); RETURN_IF_ERROR(Expr::open(_conjunct_ctxs, state)); @@ -29,7 +29,7 @@ Status OlapSchemaScanContext::prepare(RuntimeState* state) { return Status::OK(); } -Status OlapSchemaScanContext::_prepare_params(RuntimeState* state) { +Status SchemaScanContext::_prepare_params(RuntimeState* state) { _param = std::make_shared(); if (_tnode.schema_scan_node.__isset.catalog_name) { _param->catalog = _obj_pool.add(new std::string(_tnode.schema_scan_node.catalog_name)); diff --git a/be/src/exec/pipeline/scan/olap_schema_scan_context.h b/be/src/exec/pipeline/scan/schema_scan_context.h similarity index 87% rename from be/src/exec/pipeline/scan/olap_schema_scan_context.h rename to be/src/exec/pipeline/scan/schema_scan_context.h index bc4564e8723fab..8a08e6629f1352 100644 --- a/be/src/exec/pipeline/scan/olap_schema_scan_context.h +++ b/be/src/exec/pipeline/scan/schema_scan_context.h @@ -27,18 +27,18 @@ class SchemaScannerParam; class SchemaScanner; namespace pipeline { -class OlapSchemaScanContext; -using OlapSchemaScanContextPtr = std::shared_ptr; +class SchemaScanContext; +using SchemaScanContextPtr = std::shared_ptr; -class OlapSchemaScanContext { +class SchemaScanContext { public: - OlapSchemaScanContext(const TPlanNode& tnode, BalancedChunkBuffer& chunk_buffer) + SchemaScanContext(const TPlanNode& tnode, BalancedChunkBuffer& chunk_buffer) : _tnode(tnode), _table_name(tnode.schema_scan_node.table_name), _tuple_id(tnode.schema_scan_node.tuple_id), _chunk_buffer(chunk_buffer) {} - ~OlapSchemaScanContext() = default; + ~SchemaScanContext() = default; Status prepare(RuntimeState* state); diff --git a/be/src/exec/pipeline/scan/schema_scan_operator.cpp b/be/src/exec/pipeline/scan/schema_scan_operator.cpp new file mode 100644 index 00000000000000..182609bd731c69 --- /dev/null +++ b/be/src/exec/pipeline/scan/schema_scan_operator.cpp @@ -0,0 +1,97 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/scan/schema_scan_operator.h" + +#include + +#include "exec/pipeline/pipeline_builder.h" +#include "exec/pipeline/scan/balanced_chunk_buffer.h" +#include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/scan/schema_chunk_source.h" +#include "gen_cpp/Types_types.h" + +namespace starrocks::pipeline { + +SchemaScanOperatorFactory::SchemaScanOperatorFactory(int32_t id, ScanNode* schema_scan_node, size_t dop, + const TPlanNode& t_node, ChunkBufferLimiterPtr buffer_limiter) + : ScanOperatorFactory(id, schema_scan_node), + _chunk_buffer(BalanceStrategy::kDirect, dop, std::move(buffer_limiter)) { + _ctx = std::make_shared(t_node, _chunk_buffer); +} + +Status SchemaScanOperatorFactory::do_prepare(RuntimeState* state) { + return _ctx->prepare(state); +} + +void SchemaScanOperatorFactory::do_close(RuntimeState* state) {} + +OperatorPtr SchemaScanOperatorFactory::do_create(int32_t dop, int32_t driver_sequence) { + return std::make_shared(this, _id, driver_sequence, dop, _scan_node, _ctx); +} + +SchemaScanOperator::SchemaScanOperator(OperatorFactory* factory, int32_t id, int32_t driver_sequence, int32_t dop, + ScanNode* scan_node, SchemaScanContextPtr ctx) + : ScanOperator(factory, id, driver_sequence, dop, scan_node), _ctx(std::move(ctx)) {} + +SchemaScanOperator::~SchemaScanOperator() = default; + +Status SchemaScanOperator::do_prepare(RuntimeState* state) { + return Status::OK(); +} + +void SchemaScanOperator::do_close(RuntimeState* state) {} + +ChunkSourcePtr SchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), + std::move(morsel), _ctx); +} + +ChunkPtr SchemaScanOperator::get_chunk_from_buffer() { + ChunkPtr chunk = nullptr; + if (_ctx->get_chunk_buffer().try_get(_driver_sequence, &chunk)) { + return chunk; + } + return nullptr; +} + +size_t SchemaScanOperator::num_buffered_chunks() const { + return _ctx->get_chunk_buffer().size(_driver_sequence); +} + +size_t SchemaScanOperator::buffer_size() const { + return _ctx->get_chunk_buffer().limiter()->size(); +} + +size_t SchemaScanOperator::buffer_capacity() const { + return _ctx->get_chunk_buffer().limiter()->capacity(); +} + +size_t SchemaScanOperator::default_buffer_capacity() const { + return _ctx->get_chunk_buffer().limiter()->default_capacity(); +} + +ChunkBufferTokenPtr SchemaScanOperator::pin_chunk(int num_chunks) { + return _ctx->get_chunk_buffer().limiter()->pin(num_chunks); +} + +bool SchemaScanOperator::is_buffer_full() const { + return _ctx->get_chunk_buffer().limiter()->is_full(); +} + +void SchemaScanOperator::set_buffer_finished() { + _ctx->get_chunk_buffer().set_finished(_driver_sequence); +} + +} // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/scan/olap_schema_scan_operator.h b/be/src/exec/pipeline/scan/schema_scan_operator.h similarity index 74% rename from be/src/exec/pipeline/scan/olap_schema_scan_operator.h rename to be/src/exec/pipeline/scan/schema_scan_operator.h index bec2cd4959c841..0486dae52686ae 100644 --- a/be/src/exec/pipeline/scan/olap_schema_scan_operator.h +++ b/be/src/exec/pipeline/scan/schema_scan_operator.h @@ -17,19 +17,19 @@ #include "exec/pipeline/pipeline_builder.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" #include "exec/pipeline/scan/chunk_buffer_limiter.h" -#include "exec/pipeline/scan/olap_schema_scan_context.h" #include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/scan/schema_scan_context.h" #include "exec/schema_scan_node.h" #include "gen_cpp/Types_types.h" namespace starrocks::pipeline { -class OlapSchemaScanOperatorFactory final : public ScanOperatorFactory { +class SchemaScanOperatorFactory final : public ScanOperatorFactory { public: - OlapSchemaScanOperatorFactory(int32_t id, ScanNode* schema_scan_node, size_t dop, const TPlanNode& t_node, - ChunkBufferLimiterPtr buffer_limiter); + SchemaScanOperatorFactory(int32_t id, ScanNode* schema_scan_node, size_t dop, const TPlanNode& t_node, + ChunkBufferLimiterPtr buffer_limiter); - ~OlapSchemaScanOperatorFactory() override = default; + ~SchemaScanOperatorFactory() override = default; bool with_morsels() const override { return true; } @@ -40,16 +40,16 @@ class OlapSchemaScanOperatorFactory final : public ScanOperatorFactory { BalancedChunkBuffer& get_chunk_buffer() { return _chunk_buffer; } private: - OlapSchemaScanContextPtr _ctx; + SchemaScanContextPtr _ctx; BalancedChunkBuffer _chunk_buffer; }; -class OlapSchemaScanOperator final : public ScanOperator { +class SchemaScanOperator final : public ScanOperator { public: - OlapSchemaScanOperator(OperatorFactory* factory, int32_t id, int32_t driver_sequence, int32_t dop, - ScanNode* scan_node, OlapSchemaScanContextPtr ctx); + SchemaScanOperator(OperatorFactory* factory, int32_t id, int32_t driver_sequence, int32_t dop, ScanNode* scan_node, + SchemaScanContextPtr ctx); - ~OlapSchemaScanOperator() override; + ~SchemaScanOperator() override; Status do_prepare(RuntimeState* state) override; void do_close(RuntimeState* state) override; @@ -68,6 +68,6 @@ class OlapSchemaScanOperator final : public ScanOperator { bool is_buffer_full() const override; void set_buffer_finished() override; - OlapSchemaScanContextPtr _ctx; + SchemaScanContextPtr _ctx; }; } // namespace starrocks::pipeline diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index 8d21a5e176ddf7..fa5eff350db9e9 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -17,8 +17,8 @@ #include #include "column/column_helper.h" -#include "exec/pipeline/scan/olap_schema_scan_context.h" -#include "exec/pipeline/scan/olap_schema_scan_operator.h" +#include "exec/pipeline/scan/schema_scan_context.h" +#include "exec/pipeline/scan/schema_scan_operator.h" #include "exec/schema_scanner/schema_helper.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" @@ -307,8 +307,8 @@ std::vector> SchemaScanNode::decompos pipeline::ChunkBufferLimiterPtr buffer_limiter = std::make_unique( buffer_capacity, buffer_capacity, _mem_limit, runtime_state()->chunk_size()); - auto scan_op = std::make_shared(context->next_operator_id(), this, dop, - _tnode, std::move(buffer_limiter)); + auto scan_op = std::make_shared(context->next_operator_id(), this, dop, _tnode, + std::move(buffer_limiter)); return pipeline::decompose_scan_node_to_pipeline(scan_op, this, context); }