Skip to content

Commit

Permalink
[Refactor] Fix clang complains 3 (StarRocks#12812)
Browse files Browse the repository at this point in the history
  • Loading branch information
imay authored Nov 1, 2022
1 parent 367e36b commit d9e8c27
Show file tree
Hide file tree
Showing 43 changed files with 60 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void AggregateDistinctStreamingSourceOperator::close(RuntimeState* state) {

StatusOr<vectorized::ChunkPtr> AggregateDistinctStreamingSourceOperator::pull_chunk(RuntimeState* state) {
if (!_aggregator->is_chunk_buffer_empty()) {
return std::move(_aggregator->poll_chunk_buffer());
return _aggregator->poll_chunk_buffer();
}

vectorized::ChunkPtr chunk = std::make_shared<vectorized::Chunk>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ StatusOr<vectorized::ChunkPtr> AggregateStreamingSourceOperator::pull_chunk(Runt
// It is no need to distinguish whether streaming or aggregation mode
// We just first read chunk from buffer and finally read chunk from hash table
if (!_aggregator->is_chunk_buffer_empty()) {
return std::move(_aggregator->poll_chunk_buffer());
return _aggregator->poll_chunk_buffer();
}

// Even if it is streaming mode, the purpose of reading from hash table is to
Expand Down
41 changes: 5 additions & 36 deletions be/src/exec/pipeline/aggregate/repeat/repeat_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,14 @@ using namespace vectorized;
class RepeatOperator : public Operator {
public:
RepeatOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence,
const std::vector<std::set<SlotId>>& slot_id_set_list, const std::set<SlotId>& all_slot_ids,
const std::vector<std::vector<SlotId>>& null_slot_ids, const std::vector<int64_t>& repeat_id_list,
uint64_t repeat_times_required, uint64_t repeat_times_last, const ColumnPtr& column_null,
const std::vector<std::vector<ColumnPtr>>& grouping_columns,
const std::vector<std::vector<int64_t>>& grouping_list, TupleId output_tuple_id,
const std::vector<std::vector<SlotId>>& null_slot_ids, uint64_t repeat_times_required,
uint64_t repeat_times_last, const std::vector<std::vector<int64_t>>& grouping_list,
const TupleDescriptor* tuple_desc, const std::vector<ExprContext*>& conjunct_ctxs)
: Operator(factory, id, "repeat", plan_node_id, driver_sequence),
_slot_id_set_list(slot_id_set_list),
_all_slot_ids(all_slot_ids),
_null_slot_ids(null_slot_ids),
_repeat_id_list(repeat_id_list),
_repeat_times_required(repeat_times_required),
_repeat_times_last(repeat_times_last),
_column_null(column_null),
_grouping_columns(grouping_columns),
_grouping_list(grouping_list),
_output_tuple_id(output_tuple_id),
_tuple_desc(tuple_desc),
_conjunct_ctxs(conjunct_ctxs) {}
~RepeatOperator() override = default;
Expand Down Expand Up @@ -68,43 +59,24 @@ class RepeatOperator : public Operator {
ChunkPtr _curr_chunk;

/*
* _slot_id_set_list
* _all_slot_ids
* _null_slot_ids
* _repeat_id_list
* _repeat_times_required
* _repeat_times_last
* _column_null
* _grouping_columns
* _grouping_list
* _output_tuple_id
* _tuple_desc
*
* This 11 fields is referenced from factory, and that is moved from RepeatNode.
*/

// Slot id set used to indicate those slots need to set to null.
const std::vector<std::set<SlotId>>& _slot_id_set_list;
// all slot id
const std::set<SlotId>& _all_slot_ids;
const std::vector<std::vector<SlotId>>& _null_slot_ids;
// An integer bitmap list, it indicates the bit position of the exprs not null.
const std::vector<int64_t>& _repeat_id_list;
// needed repeat times
const uint64_t _repeat_times_required;
// repeat timer for chunk. 0 <= _repeat_times_last < _repeat_times_required.
uint64_t _repeat_times_last;
// only null columns for reusing, It has chunk_size rows.
const ColumnPtr& _column_null;
// column for grouping_id and virtual columns for grouping()/grouping_id() for reusing.
// It has chunk_size rows.
const std::vector<std::vector<ColumnPtr>>& _grouping_columns;
// _grouping_list for grouping_id'value and grouping()/grouping_id()'s value.
// It's a two dimensional array.
// first is grouping index and second is repeat index.
const std::vector<std::vector<int64_t>>& _grouping_list;
// Tulple id used for output, it has new slots.
const TupleId _output_tuple_id;
const TupleDescriptor* _tuple_desc;

// used for expr's compute.
Expand Down Expand Up @@ -132,17 +104,15 @@ class RepeatOperatorFactory final : public OperatorFactory {
_column_null(std::move(column_null)),
_grouping_columns(std::move(grouping_columns)),
_grouping_list(std::move(grouping_list)),
_output_tuple_id(output_tuple_id),
_tuple_desc(tuple_desc),
_conjunct_ctxs(std::move(conjunct_ctxs)) {}

~RepeatOperatorFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<RepeatOperator>(this, _id, _plan_node_id, driver_sequence, _slot_id_set_list,
_all_slot_ids, _null_slot_ids, _repeat_id_list, _repeat_times_required,
_repeat_times_last, _column_null, _grouping_columns, _grouping_list,
_output_tuple_id, _tuple_desc, _conjunct_ctxs);
return std::make_shared<RepeatOperator>(this, _id, _plan_node_id, driver_sequence, _null_slot_ids,
_repeat_times_required, _repeat_times_last, _grouping_list, _tuple_desc,
_conjunct_ctxs);
}

private:
Expand All @@ -156,7 +126,6 @@ class RepeatOperatorFactory final : public OperatorFactory {
ColumnPtr _column_null;
std::vector<std::vector<ColumnPtr>> _grouping_columns;
std::vector<std::vector<int64_t>> _grouping_list;
TupleId _output_tuple_id;
const TupleDescriptor* _tuple_desc;
std::vector<ExprContext*> _conjunct_ctxs;
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/dict_decode_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ class DictDecodeOperatorFactory final : public OperatorFactory {
_dict_optimize_parser);
}

Status prepare(RuntimeState* state);
Status prepare(RuntimeState* state) override;

void close(RuntimeState* state);
void close(RuntimeState* state) override;

private:
std::vector<int32_t> _encode_column_cids;
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,10 @@ std::shared_ptr<ExchangeSinkOperatorFactory> _create_exchange_sink_operator(Pipe
return exchange_sink;
}

DIAGNOSTIC_PUSH
#if defined(__clang__)
DIAGNOSTIC_IGNORE("-Wpotentially-evaluated-expression")
#endif
Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_state, PipelineBuilderContext* context,
const UnifiedExecPlanFragmentParams& request,
std::unique_ptr<starrocks::DataSink>& datasink,
Expand Down Expand Up @@ -807,5 +811,6 @@ Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_

return Status::OK();
}
DIAGNOSTIC_POP

} // namespace starrocks::pipeline
2 changes: 0 additions & 2 deletions be/src/exec/pipeline/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class OlapTableSinkOperatorFactory final : public OperatorFactory {
_sink0(down_cast<starrocks::stream_load::OlapTableSink*>(_data_sink.get())),
_fragment_ctx(fragment_ctx),
_cur_sender_id(start_sender_id),
_tablet_sink_dop(tablet_sink_dop),
_sinks(std::move(tablet_sinks)) {}

~OlapTableSinkOperatorFactory() override = default;
Expand All @@ -82,7 +81,6 @@ class OlapTableSinkOperatorFactory final : public OperatorFactory {
starrocks::stream_load::OlapTableSink* _sink0;
FragmentContext* const _fragment_ctx;
int32_t _cur_sender_id;
size_t _tablet_sink_dop;
std::vector<std::unique_ptr<starrocks::stream_load::OlapTableSink>> _sinks;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class PipelineDriver {
PipelineDriver(const PipelineDriver& driver)
: PipelineDriver(driver._operators, driver._query_ctx, driver._fragment_ctx, driver._driver_id) {}

~PipelineDriver();
~PipelineDriver() noexcept;

QueryContext* query_ctx() { return _query_ctx; }
const QueryContext* query_ctx() const { return _query_ctx; }
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace starrocks::pipeline {
GlobalDriverExecutor::GlobalDriverExecutor(const std::string& name, std::unique_ptr<ThreadPool> thread_pool,
bool enable_resource_group)
: Base(name),
_enable_resource_group(enable_resource_group),
_driver_queue(enable_resource_group ? std::unique_ptr<DriverQueue>(std::make_unique<WorkGroupDriverQueue>())
: std::make_unique<QuerySharedDriverQueue>()),
_thread_pool(std::move(thread_pool)),
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr

private:
LimitSetter _num_threads_setter;
const bool _enable_resource_group;
std::unique_ptr<DriverQueue> _driver_queue;
// _thread_pool must be placed after _driver_queue, because worker threads in _thread_pool use _driver_queue.
std::unique_ptr<ThreadPool> _thread_pool;
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ConnectorChunkSource final : public ChunkSource {
const workgroup::WorkGroupScanSchedEntity* _scan_sched_entity(const workgroup::WorkGroup* wg) const override;

connector::DataSourcePtr _data_source;
vectorized::ConnectorScanNode* _scan_node;
[[maybe_unused]] vectorized::ConnectorScanNode* _scan_node;
const int64_t _limit; // -1: no limit
const std::vector<ExprContext*>& _runtime_in_filters;
const vectorized::RuntimeFilterProbeCollector* _runtime_bloom_filters;
Expand All @@ -95,7 +95,6 @@ class ConnectorChunkSource final : public ChunkSource {
bool _opened = false;
bool _closed = false;
uint64_t _rows_read = 0;
uint64_t _bytes_read = 0;
};

} // namespace pipeline
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class OlapChunkSource final : public ChunkSource {
// projection iterator, doing the job of choosing |_scanner_columns| from |_reader_columns|.
std::shared_ptr<vectorized::ChunkIterator> _prj_iter;

const std::vector<std::string>* _unused_output_columns = nullptr;
std::unordered_set<uint32_t> _unused_output_column_ids;

// slot descriptors for each one of |output_columns|.
Expand All @@ -103,8 +102,6 @@ class OlapChunkSource final : public ChunkSource {

RuntimeProfile::Counter* _expr_filter_timer = nullptr;
RuntimeProfile::Counter* _create_seg_iter_timer = nullptr;
RuntimeProfile::Counter* _tablet_counter = nullptr;
RuntimeProfile::Counter* _reader_init_timer = nullptr;
RuntimeProfile::Counter* _io_timer = nullptr;
RuntimeProfile::Counter* _read_compressed_counter = nullptr;
RuntimeProfile::Counter* _decompress_timer = nullptr;
Expand Down
4 changes: 1 addition & 3 deletions be/src/exec/pipeline/scan/olap_meta_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class OlapMetaScanContextFactory {
public:
OlapMetaScanContextFactory(vectorized::OlapMetaScanNode* const scan_node, int32_t dop, bool shared_morsel_queue,
ChunkBufferLimiterPtr chunk_buffer_limiter)
: _meta_scan_node(scan_node),
_dop(dop),
: _dop(dop),
_shared_morsel_queue(shared_morsel_queue),
_chunk_buffer(BalanceStrategy::kDirect, dop, std::move(chunk_buffer_limiter)),
_contexts(shared_morsel_queue ? 1 : dop) {}
Expand All @@ -68,7 +67,6 @@ class OlapMetaScanContextFactory {
}

private:
vectorized::OlapMetaScanNode* const _meta_scan_node;
const int32_t _dop;
const bool _shared_morsel_queue;
BalancedChunkBuffer _chunk_buffer;
Expand Down
30 changes: 10 additions & 20 deletions be/src/exec/pipeline/sort/local_partition_topn_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,19 @@

namespace starrocks::pipeline {

LocalPartitionTopnContext::LocalPartitionTopnContext(
const std::vector<TExpr>& t_partition_exprs, const std::vector<ExprContext*>& sort_exprs,
std::vector<bool> is_asc_order, std::vector<bool> is_null_first, std::string sort_keys, int64_t offset,
int64_t partition_limit, const TTopNType::type topn_type, const std::vector<OrderByType>& order_by_types,
TupleDescriptor* materialized_tuple_desc, const RowDescriptor& parent_node_row_desc,
const RowDescriptor& parent_node_child_row_desc)
LocalPartitionTopnContext::LocalPartitionTopnContext(const std::vector<TExpr>& t_partition_exprs,
const std::vector<ExprContext*>& sort_exprs,
std::vector<bool> is_asc_order, std::vector<bool> is_null_first,
std::string sort_keys, int64_t offset, int64_t partition_limit,
const TTopNType::type topn_type)
: _t_partition_exprs(t_partition_exprs),
_sort_exprs(sort_exprs),
_is_asc_order(std::move(is_asc_order)),
_is_null_first(std::move(is_null_first)),
_sort_keys(std::move(sort_keys)),
_offset(offset),
_partition_limit(partition_limit),
_topn_type(topn_type),
_order_by_types(order_by_types),
_materialized_tuple_desc(materialized_tuple_desc),
_parent_node_row_desc(parent_node_row_desc),
_parent_node_child_row_desc(parent_node_child_row_desc) {}
_topn_type(topn_type) {}

Status LocalPartitionTopnContext::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_partition_exprs, &_partition_exprs));
Expand Down Expand Up @@ -143,20 +138,15 @@ LocalPartitionTopnContextFactory::LocalPartitionTopnContextFactory(
_sort_keys(std::move(sort_keys)),
_offset(offset),
_partition_limit(partition_limit),
_topn_type(topn_type),
_order_by_types(order_by_types),
_materialized_tuple_desc(materialized_tuple_desc),
_parent_node_row_desc(parent_node_row_desc),
_parent_node_child_row_desc(parent_node_child_row_desc) {}
_topn_type(topn_type) {}

LocalPartitionTopnContext* LocalPartitionTopnContextFactory::create(int32_t driver_sequence) {
DCHECK_LT(driver_sequence, _ctxs.size());

if (_ctxs[driver_sequence] == nullptr) {
_ctxs[driver_sequence] = std::make_shared<LocalPartitionTopnContext>(
_t_partition_exprs, _sort_exprs, _is_asc_order, _is_null_first, _sort_keys, _offset, _partition_limit,
_topn_type, _order_by_types, _materialized_tuple_desc, _parent_node_row_desc,
_parent_node_child_row_desc);
_ctxs[driver_sequence] = std::make_shared<LocalPartitionTopnContext>(_t_partition_exprs, _sort_exprs,
_is_asc_order, _is_null_first, _sort_keys,
_offset, _partition_limit, _topn_type);
}

return _ctxs[driver_sequence].get();
Expand Down
13 changes: 1 addition & 12 deletions be/src/exec/pipeline/sort/local_partition_topn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ class LocalPartitionTopnContext {
public:
LocalPartitionTopnContext(const std::vector<TExpr>& t_partition_exprs, const std::vector<ExprContext*>& sort_exprs,
std::vector<bool> is_asc_order, std::vector<bool> is_null_first, std::string sort_keys,
int64_t offset, int64_t partition_limit, const TTopNType::type topn_type,
const std::vector<OrderByType>& order_by_types, TupleDescriptor* materialized_tuple_desc,
const RowDescriptor& parent_node_row_desc,
const RowDescriptor& parent_node_child_row_desc);
int64_t offset, int64_t partition_limit, const TTopNType::type topn_type);

Status prepare(RuntimeState* state);

Expand Down Expand Up @@ -80,10 +77,6 @@ class LocalPartitionTopnContext {
int64_t _offset;
int64_t _partition_limit;
const TTopNType::type _topn_type;
const std::vector<OrderByType>& _order_by_types;
TupleDescriptor* _materialized_tuple_desc;
const RowDescriptor& _parent_node_row_desc;
const RowDescriptor& _parent_node_child_row_desc;

int32_t _sorter_index = 0;
};
Expand Down Expand Up @@ -117,9 +110,5 @@ class LocalPartitionTopnContextFactory {
int64_t _offset;
int64_t _partition_limit;
const TTopNType::type _topn_type;
const std::vector<OrderByType>& _order_by_types;
TupleDescriptor* _materialized_tuple_desc;
const RowDescriptor& _parent_node_row_desc;
const RowDescriptor& _parent_node_child_row_desc;
};
} // namespace starrocks::pipeline
1 change: 0 additions & 1 deletion be/src/exec/vectorized/chunks_sorter_heap_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ class ChunksSorterHeapSort final : public ChunksSorter {
template <PrimitiveType TYPE>
void _do_filter_data_for_type(detail::ChunkHolder* chunk_holder, Column::Filter* filter, int row_sz);

bool _init_status = false;
using CursorContainer = std::vector<detail::ChunkRowCursor>;
using CommonCursorSortHeap =
detail::SortingHeap<detail::ChunkRowCursor, CursorContainer, detail::ChunkCursorComparator>;
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/vectorized/json_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ class JsonDocumentStreamParser : public JsonParser {
Status advance() noexcept override;

private:
uint8_t* _data = nullptr;

// data is parsed as a document stream.

// iterator context for document stream.
Expand Down Expand Up @@ -63,8 +61,6 @@ class JsonArrayParser : public JsonParser {
Status advance() noexcept override;

private:
uint8_t* _data = nullptr;

// data is parsed as a document in array type.
simdjson::ondemand::document _doc;

Expand Down
22 changes: 2 additions & 20 deletions be/src/exec/vectorized/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
}

void ParquetReaderWrap::close() {
_parquet->Close();
[[maybe_unused]] auto st = _parquet->Close();
}

Status ParquetReaderWrap::size(int64_t* size) {
Expand Down Expand Up @@ -279,29 +279,11 @@ using ArrowStatusCode = ::arrow::StatusCode;
using StarRocksStatus = ::starrocks::Status;
using ArrowStatus = ::arrow::Status;

static inline ArrowStatusCode convert_status_code(StarRocksStatusCode code) {
switch (code) {
case StarRocksStatusCode::OK:
return ArrowStatusCode::OK;
case StarRocksStatusCode::NOT_FOUND:
case StarRocksStatusCode::END_OF_FILE:
return ArrowStatusCode::IOError;
case StarRocksStatusCode::NOT_IMPLEMENTED_ERROR:
return ArrowStatusCode::NotImplemented;
case StarRocksStatusCode::MEM_ALLOC_FAILED:
case StarRocksStatusCode::BUFFER_ALLOCATION_FAILED:
case StarRocksStatusCode::MEM_LIMIT_EXCEEDED:
return ArrowStatusCode::OutOfMemory;
default:
return ArrowStatusCode::ExecutionError;
}
}

ParquetChunkFile::ParquetChunkFile(std::shared_ptr<starrocks::RandomAccessFile> file, uint64_t pos)
: _file(std::move(file)), _pos(pos) {}

ParquetChunkFile::~ParquetChunkFile() {
Close();
[[maybe_unused]] auto st = Close();
}

arrow::Status ParquetChunkFile::Close() {
Expand Down
Loading

0 comments on commit d9e8c27

Please sign in to comment.