Skip to content

Commit

Permalink
rename OlapSchemaScan -> SchemaScan
Browse files Browse the repository at this point in the history
Signed-off-by: Letian Jiang <letian.jiang@outlook.com>
  • Loading branch information
letian-jiang committed Aug 8, 2023
1 parent 4dc84b0 commit c5fb0d4
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 139 deletions.
6 changes: 3 additions & 3 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ set(EXEC_FILES
pipeline/scan/meta_scan_prepare_operator.cpp
pipeline/scan/olap_meta_scan_prepare_operator.cpp
pipeline/scan/lake_meta_scan_prepare_operator.cpp
pipeline/scan/olap_schema_chunk_source.cpp
pipeline/scan/olap_schema_scan_operator.cpp
pipeline/scan/olap_schema_scan_context.cpp
pipeline/scan/schema_chunk_source.cpp
pipeline/scan/schema_scan_operator.cpp
pipeline/scan/schema_scan_context.cpp
pipeline/sink/iceberg_table_sink_operator.cpp
pipeline/scan/morsel.cpp
pipeline/scan/chunk_buffer_limiter.cpp
Expand Down
98 changes: 0 additions & 98 deletions be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/scan/olap_schema_chunk_source.h"
#include "exec/pipeline/scan/schema_chunk_source.h"

#include <boost/algorithm/string.hpp>

Expand All @@ -21,13 +21,13 @@

namespace starrocks::pipeline {

OlapSchemaChunkSource::OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
const OlapSchemaScanContextPtr& ctx)
SchemaChunkSource::SchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
const SchemaScanContextPtr& ctx)
: ChunkSource(op, runtime_profile, std::move(morsel), ctx->get_chunk_buffer()), _ctx(ctx) {}

OlapSchemaChunkSource::~OlapSchemaChunkSource() = default;
SchemaChunkSource::~SchemaChunkSource() = default;

Status OlapSchemaChunkSource::prepare(RuntimeState* state) {
Status SchemaChunkSource::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ChunkSource::prepare(state));
_dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_ctx->tuple_id());
if (_dest_tuple_desc == nullptr) {
Expand Down Expand Up @@ -84,9 +84,9 @@ Status OlapSchemaChunkSource::prepare(RuntimeState* state) {
return _schema_scanner->start(state);
}

void OlapSchemaChunkSource::close(RuntimeState* state) {}
void SchemaChunkSource::close(RuntimeState* state) {}

Status OlapSchemaChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
Status SchemaChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
const std::vector<SlotDescriptor*>& src_slot_descs = _schema_scanner->get_slot_descs();
const std::vector<SlotDescriptor*>& dest_slot_descs = _dest_tuple_desc->slots();

Expand Down Expand Up @@ -162,8 +162,7 @@ Status OlapSchemaChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk)
return Status::OK();
}

const workgroup::WorkGroupScanSchedEntity* OlapSchemaChunkSource::_scan_sched_entity(
const workgroup::WorkGroup* wg) const {
const workgroup::WorkGroupScanSchedEntity* SchemaChunkSource::_scan_sched_entity(const workgroup::WorkGroup* wg) const {
DCHECK(wg != nullptr);
return wg->scan_sched_entity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "column/vectorized_fwd.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/pipeline/scan/chunk_source.h"
#include "exec/pipeline/scan/olap_schema_scan_context.h"
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/pipeline/scan/schema_scan_context.h"
#include "runtime/runtime_state.h"
#include "storage/chunk_helper.h"

Expand All @@ -28,12 +28,12 @@ class SchemaScanner;

namespace pipeline {

class OlapSchemaChunkSource final : public ChunkSource {
class SchemaChunkSource final : public ChunkSource {
public:
OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
const OlapSchemaScanContextPtr& ctx);
SchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
const SchemaScanContextPtr& ctx);

~OlapSchemaChunkSource() override;
~SchemaChunkSource() override;

Status prepare(RuntimeState* state) override;

Expand All @@ -47,7 +47,7 @@ class OlapSchemaChunkSource final : public ChunkSource {
const TupleDescriptor* _dest_tuple_desc;
std::unique_ptr<SchemaScanner> _schema_scanner;

OlapSchemaScanContextPtr _ctx;
SchemaScanContextPtr _ctx;

RuntimeProfile::Counter* _filter_timer = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/scan/olap_schema_scan_context.h"
#include "exec/pipeline/scan/schema_scan_context.h"

#include <boost/algorithm/string.hpp>

Expand All @@ -21,15 +21,15 @@

namespace starrocks::pipeline {

Status OlapSchemaScanContext::prepare(RuntimeState* state) {
Status SchemaScanContext::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Expr::create_expr_trees(&_obj_pool, _tnode.conjuncts, &_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::open(_conjunct_ctxs, state));
RETURN_IF_ERROR(_prepare_params(state));
return Status::OK();
}

Status OlapSchemaScanContext::_prepare_params(RuntimeState* state) {
Status SchemaScanContext::_prepare_params(RuntimeState* state) {
_param = std::make_shared<SchemaScannerParam>();
if (_tnode.schema_scan_node.__isset.catalog_name) {
_param->catalog = _obj_pool.add(new std::string(_tnode.schema_scan_node.catalog_name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ class SchemaScannerParam;
class SchemaScanner;

namespace pipeline {
class OlapSchemaScanContext;
using OlapSchemaScanContextPtr = std::shared_ptr<OlapSchemaScanContext>;
class SchemaScanContext;
using SchemaScanContextPtr = std::shared_ptr<SchemaScanContext>;

class OlapSchemaScanContext {
class SchemaScanContext {
public:
OlapSchemaScanContext(const TPlanNode& tnode, BalancedChunkBuffer& chunk_buffer)
SchemaScanContext(const TPlanNode& tnode, BalancedChunkBuffer& chunk_buffer)
: _tnode(tnode),
_table_name(tnode.schema_scan_node.table_name),
_tuple_id(tnode.schema_scan_node.tuple_id),
_chunk_buffer(chunk_buffer) {}

~OlapSchemaScanContext() = default;
~SchemaScanContext() = default;

Status prepare(RuntimeState* state);

Expand Down
97 changes: 97 additions & 0 deletions be/src/exec/pipeline/scan/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/scan/schema_scan_operator.h"

#include <utility>

#include "exec/pipeline/pipeline_builder.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/pipeline/scan/schema_chunk_source.h"
#include "gen_cpp/Types_types.h"

namespace starrocks::pipeline {

SchemaScanOperatorFactory::SchemaScanOperatorFactory(int32_t id, ScanNode* schema_scan_node, size_t dop,
const TPlanNode& t_node, ChunkBufferLimiterPtr buffer_limiter)
: ScanOperatorFactory(id, schema_scan_node),
_chunk_buffer(BalanceStrategy::kDirect, dop, std::move(buffer_limiter)) {
_ctx = std::make_shared<SchemaScanContext>(t_node, _chunk_buffer);
}

Status SchemaScanOperatorFactory::do_prepare(RuntimeState* state) {
return _ctx->prepare(state);
}

void SchemaScanOperatorFactory::do_close(RuntimeState* state) {}

OperatorPtr SchemaScanOperatorFactory::do_create(int32_t dop, int32_t driver_sequence) {
return std::make_shared<SchemaScanOperator>(this, _id, driver_sequence, dop, _scan_node, _ctx);
}

SchemaScanOperator::SchemaScanOperator(OperatorFactory* factory, int32_t id, int32_t driver_sequence, int32_t dop,
ScanNode* scan_node, SchemaScanContextPtr ctx)
: ScanOperator(factory, id, driver_sequence, dop, scan_node), _ctx(std::move(ctx)) {}

SchemaScanOperator::~SchemaScanOperator() = default;

Status SchemaScanOperator::do_prepare(RuntimeState* state) {
return Status::OK();
}

void SchemaScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr SchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
return std::make_shared<SchemaChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), _ctx);
}

ChunkPtr SchemaScanOperator::get_chunk_from_buffer() {
ChunkPtr chunk = nullptr;
if (_ctx->get_chunk_buffer().try_get(_driver_sequence, &chunk)) {
return chunk;
}
return nullptr;
}

size_t SchemaScanOperator::num_buffered_chunks() const {
return _ctx->get_chunk_buffer().size(_driver_sequence);
}

size_t SchemaScanOperator::buffer_size() const {
return _ctx->get_chunk_buffer().limiter()->size();
}

size_t SchemaScanOperator::buffer_capacity() const {
return _ctx->get_chunk_buffer().limiter()->capacity();
}

size_t SchemaScanOperator::default_buffer_capacity() const {
return _ctx->get_chunk_buffer().limiter()->default_capacity();
}

ChunkBufferTokenPtr SchemaScanOperator::pin_chunk(int num_chunks) {
return _ctx->get_chunk_buffer().limiter()->pin(num_chunks);
}

bool SchemaScanOperator::is_buffer_full() const {
return _ctx->get_chunk_buffer().limiter()->is_full();
}

void SchemaScanOperator::set_buffer_finished() {
_ctx->get_chunk_buffer().set_finished(_driver_sequence);
}

} // namespace starrocks::pipeline
Loading

0 comments on commit c5fb0d4

Please sign in to comment.