diff --git a/be/src/common/config.h b/be/src/common/config.h index 0367c73f5d521..cb11e51540641 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -928,7 +928,6 @@ CONF_mBool(parquet_coalesce_read_enable, "true"); CONF_Bool(parquet_late_materialization_enable, "true"); CONF_Bool(parquet_page_index_enable, "true"); CONF_mBool(parquet_statistics_process_more_filter_enable, "true"); -CONF_mBool(parquet_advance_zonemap_filter, "true"); CONF_Int32(io_coalesce_read_max_buffer_size, "8388608"); CONF_Int32(io_coalesce_read_max_distance_size, "1048576"); diff --git a/be/src/exec/hdfs_scanner.cpp b/be/src/exec/hdfs_scanner.cpp index 30db96050e70f..b22273b9aea17 100644 --- a/be/src/exec/hdfs_scanner.cpp +++ b/be/src/exec/hdfs_scanner.cpp @@ -175,25 +175,24 @@ Status HdfsScanner::_build_scanner_context() { ctx.connector_max_split_size = _scanner_params.connector_max_split_size; ctx.global_dictmaps = _scanner_params.global_dictmaps; - if (config::parquet_advance_zonemap_filter) { - ScanConjunctsManagerOptions opts; - opts.conjunct_ctxs_ptr = &_scanner_params.all_conjunct_ctxs; - opts.tuple_desc = _scanner_params.tuple_desc; - opts.obj_pool = _runtime_state->obj_pool(); - opts.runtime_filters = _scanner_params.runtime_filter_collector; - opts.runtime_state = _runtime_state; - opts.enable_column_expr_predicate = true; - opts.is_olap_scan = false; - opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params(); - ctx.conjuncts_manager = std::make_unique(std::move(opts)); - RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts()); - auto* predicate_parser = - opts.obj_pool->add(new ConnectorPredicateParser(&_scanner_params.tuple_desc->decoded_slots())); - ASSIGN_OR_RETURN(ctx.predicate_tree, - ctx.conjuncts_manager->get_predicate_tree(predicate_parser, ctx.predicate_free_pool)); - ctx.rf_scan_range_pruner = opts.obj_pool->add( - new RuntimeScanRangePruner(predicate_parser, ctx.conjuncts_manager->unarrived_runtime_filters())); - } + ScanConjunctsManagerOptions opts; + opts.conjunct_ctxs_ptr = &_scanner_params.all_conjunct_ctxs; + opts.tuple_desc = _scanner_params.tuple_desc; + opts.obj_pool = _runtime_state->obj_pool(); + opts.runtime_filters = _scanner_params.runtime_filter_collector; + opts.runtime_state = _runtime_state; + opts.enable_column_expr_predicate = true; + opts.is_olap_scan = false; + opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params(); + ctx.conjuncts_manager = std::make_unique(std::move(opts)); + RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts()); + auto* predicate_parser = + opts.obj_pool->add(new ConnectorPredicateParser(&_scanner_params.tuple_desc->decoded_slots())); + ASSIGN_OR_RETURN(ctx.predicate_tree, + ctx.conjuncts_manager->get_predicate_tree(predicate_parser, ctx.predicate_free_pool)); + ctx.rf_scan_range_pruner = opts.obj_pool->add( + new RuntimeScanRangePruner(predicate_parser, ctx.conjuncts_manager->unarrived_runtime_filters())); + return Status::OK(); } diff --git a/be/src/formats/CMakeLists.txt b/be/src/formats/CMakeLists.txt index 280d9e54fe7ac..d864bdc070f06 100644 --- a/be/src/formats/CMakeLists.txt +++ b/be/src/formats/CMakeLists.txt @@ -66,7 +66,6 @@ add_library(Formats STATIC parquet/schema.cpp parquet/stored_column_reader.cpp parquet/stored_column_reader_with_index.cpp - parquet/page_index_reader.cpp parquet/utils.cpp parquet/metadata.cpp parquet/meta_helper.cpp diff --git a/be/src/formats/parquet/column_reader.cpp b/be/src/formats/parquet/column_reader.cpp index 8ce0e49a71913..8de14b5303d9d 100644 --- a/be/src/formats/parquet/column_reader.cpp +++ b/be/src/formats/parquet/column_reader.cpp @@ -17,8 +17,6 @@ #include #include -#include -#include #include #include "column/chunk.h" @@ -36,6 +34,18 @@ namespace starrocks::parquet { +void ColumnOffsetIndexCtx::collect_io_range(std::vector* ranges, + int64_t* end_offset, bool active) { + for (size_t i = 0; i < page_selected.size(); i++) { + if (page_selected[i]) { + auto r = io::SharedBufferedInputStream::IORange( + offset_index.page_locations[i].offset, offset_index.page_locations[i].compressed_page_size, active); + ranges->emplace_back(r); + *end_offset = std::max(*end_offset, r.offset + r.size); + } + } +} + Status ColumnDictFilterContext::rewrite_conjunct_ctxs_to_predicate(StoredColumnReader* reader, bool* is_group_filtered) { // create dict value chunk for evaluation. diff --git a/be/src/formats/parquet/column_reader.h b/be/src/formats/parquet/column_reader.h index 9af20fa8bcda7..8654a30f8c585 100644 --- a/be/src/formats/parquet/column_reader.h +++ b/be/src/formats/parquet/column_reader.h @@ -57,6 +57,20 @@ struct ParquetField; namespace starrocks::parquet { +struct ColumnOffsetIndexCtx { + tparquet::OffsetIndex offset_index; + std::vector page_selected; + uint64_t rg_first_row; + + void collect_io_range(std::vector* ranges, int64_t* end_offset, + bool active); + + // be compatible with PARQUET-1850 + bool check_dictionary_page(int64_t data_page_offset) { + return offset_index.page_locations.size() > 0 && offset_index.page_locations[0].offset > data_page_offset; + } +}; + struct ColumnReaderOptions { std::string timezone; bool case_sensitive = false; diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index d19dffba9fc2b..df9f56add1ef2 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -177,212 +177,28 @@ Status FileReader::_build_split_tasks() { return Status::OK(); } -bool FileReader::_filter_group_with_min_max_conjuncts(const GroupReaderPtr& group_reader) { - // filter by min/max conjunct ctxs. - if (!_scanner_ctx->min_max_conjunct_ctxs.empty()) { - const TupleDescriptor& tuple_desc = *(_scanner_ctx->min_max_tuple_desc); - ChunkPtr min_chunk = ChunkHelper::new_chunk(tuple_desc, 0); - ChunkPtr max_chunk = ChunkHelper::new_chunk(tuple_desc, 0); - - auto st = _read_min_max_chunk(group_reader, tuple_desc.slots(), &min_chunk, &max_chunk); - if (!st.ok()) { - // if there are some error when dealing statistics, shouldn't return the error status, - // just read data ignore the statistics. - return false; - } - - for (auto& min_max_conjunct_ctx : _scanner_ctx->min_max_conjunct_ctxs) { - auto res_min = min_max_conjunct_ctx->evaluate(min_chunk.get()); - auto res_max = min_max_conjunct_ctx->evaluate(max_chunk.get()); - if (!res_min.ok() || !res_max.ok()) { - // maybe one of the conjuncts encounter error when dealing statistics, just ignore it and continue - continue; - } - const auto& min_column = res_min.value(); - const auto& max_column = res_max.value(); - auto f = [&](Column* c) { - // is_null(0) only when something unexpected happens - if (c->is_null(0)) return (int8_t)0; - return c->get(0).get_int8(); - }; - auto min = f(min_column.get()); - auto max = f(max_column.get()); - if (min == 0 && max == 0) { - return true; - } - } - } - return false; -} - -bool FileReader::_filter_group_with_bloom_filter_min_max_conjuncts(const GroupReaderPtr& group_reader) { - // filter by min/max in runtime filter. - if (_scanner_ctx->runtime_filter_collector) { - std::vector min_max_slots(1); - - const std::vector& slots = _scanner_ctx->slot_descs; - - for (auto& it : _scanner_ctx->runtime_filter_collector->descriptors()) { - RuntimeFilterProbeDescriptor* rf_desc = it.second; - // external node won't have colocate runtime filter - const JoinRuntimeFilter* filter = rf_desc->runtime_filter(-1); - SlotId probe_slot_id; - if (filter == nullptr || !rf_desc->is_probe_slot_ref(&probe_slot_id)) continue; - // !!linear search slot by slot_id. - SlotDescriptor* slot = nullptr; - for (SlotDescriptor* s : slots) { - if (s->id() == probe_slot_id) { - slot = s; - break; - } - } - if (!slot) continue; - min_max_slots[0] = slot; - - if (filter->has_null()) { - std::vector has_nulls; - auto st = _read_has_nulls(group_reader, min_max_slots, &has_nulls); - if (!st.ok()) continue; - - if (has_nulls[0]) { - continue; - } - } - - { - ChunkPtr min_chunk = ChunkHelper::new_chunk(min_max_slots, 0); - ChunkPtr max_chunk = ChunkHelper::new_chunk(min_max_slots, 0); - - auto st = _read_min_max_chunk(group_reader, min_max_slots, &min_chunk, &max_chunk); - if (!st.ok()) continue; - bool discard = RuntimeFilterHelper::filter_zonemap_with_min_max( - slot->type().type, filter, min_chunk->columns()[0].get(), max_chunk->columns()[0].get()); - if (discard) { - return true; - } - } - } - } - return false; -} - -bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_reader) { - // runtime_in_filter, the sql-original in_filter and is_null/not_null filter will be in - // _scanner_ctx->conjunct_ctxs_by_slot - for (const auto& kv : _scanner_ctx->conjunct_ctxs_by_slot) { - StatisticsHelper::StatSupportedFilter filter_type; - for (auto ctx : kv.second) { - if (StatisticsHelper::can_be_used_for_statistics_filter(ctx, filter_type)) { - SlotDescriptor* slot = nullptr; - for (auto s : _scanner_ctx->slot_descs) { - if (s->id() == kv.first) { - slot = s; - } - } - - if (UNLIKELY(slot == nullptr)) { - // it shouldn't be here, just some defensive code - DCHECK(false) << "couldn't find slot id " << kv.first << " in tuple desc"; - LOG(WARNING) << "couldn't find slot id " << kv.first << " in tuple desc"; - continue; - } - const tparquet::ColumnMetaData* column_meta = nullptr; - const tparquet::ColumnChunk* column_chunk = group_reader->get_chunk_metadata(slot->id()); - if (column_chunk && column_chunk->__isset.meta_data) { - column_meta = &column_chunk->meta_data; - } - if (column_meta == nullptr || !column_meta->__isset.statistics) continue; - if (filter_type == StatisticsHelper::StatSupportedFilter::IS_NULL) { - if (!column_meta->statistics.__isset.null_count) continue; - if (column_meta->statistics.null_count == 0) { - return true; - } - } else if (filter_type == StatisticsHelper::StatSupportedFilter::IS_NOT_NULL) { - if (!column_meta->statistics.__isset.null_count) continue; - if (column_meta->statistics.null_count == group_reader->get_row_group_metadata()->num_rows) { - return true; - } - } else if (filter_type == StatisticsHelper::StatSupportedFilter::FILTER_IN) { - if (!column_meta->statistics.__isset.null_count) continue; - - std::vector min_values; - std::vector max_values; - std::vector null_counts; - std::vector null_pages; - int64_t num_rows = group_reader->get_row_group_metadata()->num_rows; - - const ParquetField* field = group_reader->get_column_parquet_field(slot->id()); - if (field == nullptr) { - LOG(WARNING) << "Can't get " + slot->col_name() + "'s ParquetField in _read_min_max_chunk."; - continue; - } - Status st; - - null_counts.emplace_back(column_meta->statistics.null_count); - null_pages.emplace_back(num_rows == column_meta->statistics.null_count); - if (num_rows == column_meta->statistics.null_count) { - min_values.emplace_back(""); - max_values.emplace_back(""); - } else { - st = StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, field, - min_values, max_values); - if (!st.ok()) continue; - } - - Filter selected(min_values.size(), 1); - st = StatisticsHelper::in_filter_on_min_max_stat(min_values, max_values, null_pages, null_counts, - ctx, field, _scanner_ctx->timezone, selected); - if (!st.ok()) continue; - if (!selected[0]) { - return true; - } - } else if (filter_type == StatisticsHelper::StatSupportedFilter::RF_MIN_MAX) { - // already process in `_filter_group_with_bloom_filter_min_max_conjuncts`. - } - } - } - } - return false; -} - // when doing row group filter, there maybe some error, but we'd better just ignore it instead of returning the error // status and lead to the query failed. bool FileReader::_filter_group(const GroupReaderPtr& group_reader) { - if (config::parquet_advance_zonemap_filter) { - if (_scanner_ctx->rf_scan_range_pruner != nullptr) { - _rf_scan_range_pruner = std::make_shared(*_scanner_ctx->rf_scan_range_pruner); - } - auto res = _scanner_ctx->predicate_tree.visit( - ZoneMapEvaluator{_scanner_ctx->predicate_tree, group_reader.get()}); - if (!res.ok()) { - LOG(WARNING) << "filter row group failed: " << res.status().message(); - return false; - } - if (res.value().has_value() && res.value()->span_size() == 0) { - // no rows selected, the whole row group can be filtered - return true; - } - return false; - } else { - if (_filter_group_with_min_max_conjuncts(group_reader)) { - return true; - } - - if (_filter_group_with_bloom_filter_min_max_conjuncts(group_reader)) { - return true; - } - - if (config::parquet_statistics_process_more_filter_enable && _filter_group_with_more_filter(group_reader)) { - return true; - } - + if (_scanner_ctx->rf_scan_range_pruner != nullptr) { + _rf_scan_range_pruner = std::make_shared(*_scanner_ctx->rf_scan_range_pruner); + } + auto res = _scanner_ctx->predicate_tree.visit( + ZoneMapEvaluator{_scanner_ctx->predicate_tree, group_reader.get()}); + if (!res.ok()) { + LOG(WARNING) << "filter row group failed: " << res.status().message(); return false; } + if (res.value().has_value() && res.value()->span_size() == 0) { + // no rows selected, the whole row group can be filtered + return true; + } + return false; } StatusOr FileReader::_update_rf_and_filter_group(const GroupReaderPtr& group_reader) { bool filter = false; - if (config::parquet_advance_zonemap_filter && _rf_scan_range_pruner != nullptr) { + if (_rf_scan_range_pruner != nullptr) { RETURN_IF_ERROR(_rf_scan_range_pruner->update_range_if_arrived( _scanner_ctx->global_dictmaps, [&filter, &group_reader](auto cid, const PredicateList& predicates) { @@ -403,108 +219,6 @@ StatusOr FileReader::_update_rf_and_filter_group(const GroupReaderPtr& gro return filter; } -Status FileReader::_read_has_nulls(const GroupReaderPtr& group_reader, const std::vector& slots, - std::vector* has_nulls) { - const HdfsScannerContext& ctx = *_scanner_ctx; - - for (size_t i = 0; i < slots.size(); i++) { - const SlotDescriptor* slot = slots[i]; - const tparquet::ColumnMetaData* column_meta = nullptr; - const tparquet::ColumnChunk* column_chunk = group_reader->get_chunk_metadata(slot->id()); - if (column_chunk && column_chunk->__isset.meta_data) { - column_meta = &column_chunk->meta_data; - } - if (column_meta == nullptr) { - int col_idx = _get_partition_column_idx(slot->col_name()); - if (col_idx < 0) { - // column not exist in parquet file - (*has_nulls).emplace_back(true); - } else { - // is partition column - auto* const_column = ColumnHelper::as_raw_column(ctx.partition_values[col_idx]); - ColumnPtr data_column = const_column->data_column(); - if (data_column->is_nullable()) { - (*has_nulls).emplace_back(true); - } else { - (*has_nulls).emplace_back(false); - } - } - } else if (!column_meta->__isset.statistics) { - // statistics not exist in parquet file - return Status::Aborted("No exist statistics"); - } else { - RETURN_IF_ERROR(StatisticsHelper::get_has_nulls(column_meta, *has_nulls)); - } - } - - return Status::OK(); -} - -Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const std::vector& slots, - ChunkPtr* min_chunk, ChunkPtr* max_chunk) const { - const HdfsScannerContext& ctx = *_scanner_ctx; - - for (size_t i = 0; i < slots.size(); i++) { - const SlotDescriptor* slot = slots[i]; - const tparquet::ColumnMetaData* column_meta = nullptr; - const tparquet::ColumnChunk* column_chunk = group_reader->get_chunk_metadata(slot->id()); - if (column_chunk && column_chunk->__isset.meta_data) { - column_meta = &column_chunk->meta_data; - } - if (column_meta == nullptr) { - int col_idx = _get_partition_column_idx(slot->col_name()); - if (col_idx < 0) { - // column not exist in parquet file - (*min_chunk)->columns()[i]->append_nulls(1); - (*max_chunk)->columns()[i]->append_nulls(1); - } else { - // is partition column - auto* const_column = ColumnHelper::as_raw_column(ctx.partition_values[col_idx]); - ColumnPtr data_column = const_column->data_column(); - if (data_column->is_nullable()) { - (*min_chunk)->columns()[i]->append_nulls(1); - (*max_chunk)->columns()[i]->append_nulls(1); - } else { - (*min_chunk)->columns()[i]->append(*data_column, 0, 1); - (*max_chunk)->columns()[i]->append(*data_column, 0, 1); - } - } - } else if (!column_meta->__isset.statistics) { - // statistics not exist in parquet file - return Status::Aborted("No exist statistics"); - } else { - size_t num_rows = group_reader->get_row_group_metadata()->num_rows; - std::vector min_values; - std::vector max_values; - std::vector null_pages; - - // If all values of one group is null, the statistics is like this: - // max=, min=, null_count=3, distinct_count=, max_value=, min_value= - if (column_meta->statistics.__isset.null_count && column_meta->statistics.null_count == num_rows) { - (*min_chunk)->columns()[i]->append_nulls(1); - (*max_chunk)->columns()[i]->append_nulls(1); - continue; - } - - const ParquetField* field = group_reader->get_column_parquet_field(slot->id()); - if (field == nullptr) { - LOG(WARNING) << "Can't get " + slot->col_name() + "'s ParquetField in _read_min_max_chunk."; - return Status::InternalError(strings::Substitute("Can't get $0 field", slot->col_name())); - } - - RETURN_IF_ERROR(StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, field, - min_values, max_values)); - null_pages.emplace_back(false); - RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*min_chunk)->columns()[i], min_values, - null_pages, slot->type(), field, ctx.timezone)); - RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*max_chunk)->columns()[i], max_values, - null_pages, slot->type(), field, ctx.timezone)); - } - } - - return Status::OK(); -} - int32_t FileReader::_get_partition_column_idx(const std::string& col_name) const { for (int32_t i = 0; i < _scanner_ctx->partition_columns.size(); i++) { if (_scanner_ctx->partition_columns[i].name() == col_name) { diff --git a/be/src/formats/parquet/file_reader.h b/be/src/formats/parquet/file_reader.h index 05f16b81dcbb3..01f676713ba97 100644 --- a/be/src/formats/parquet/file_reader.h +++ b/be/src/formats/parquet/file_reader.h @@ -100,24 +100,11 @@ class FileReader { bool _filter_group(const GroupReaderPtr& group_reader); StatusOr _update_rf_and_filter_group(const GroupReaderPtr& group_reader); - bool _filter_group_with_min_max_conjuncts(const GroupReaderPtr& group_reader); - - bool _filter_group_with_bloom_filter_min_max_conjuncts(const GroupReaderPtr& group_reader); - - bool _filter_group_with_more_filter(const GroupReaderPtr& group_reader); - // get row group to read // if scan range conatain the first byte in the row group, will be read // TODO: later modify the larger block should be read bool _select_row_group(const tparquet::RowGroup& row_group); - // make min/max chunk from stats of row group meta - // exist=true: group meta contain statistics info - Status _read_min_max_chunk(const GroupReaderPtr& group_reader, const std::vector& slots, - ChunkPtr* min_chunk, ChunkPtr* max_chunk) const; - Status _read_has_nulls(const GroupReaderPtr& group_reader, const std::vector& slots, - std::vector* has_nulls); - // only scan partition column + not exist column Status _exec_no_materialized_column_scan(ChunkPtr* chunk); diff --git a/be/src/formats/parquet/group_reader.cpp b/be/src/formats/parquet/group_reader.cpp index d5ef1eaf0a488..c2ff8c7d39fe0 100644 --- a/be/src/formats/parquet/group_reader.cpp +++ b/be/src/formats/parquet/group_reader.cpp @@ -29,7 +29,6 @@ #include "exprs/expr_context.h" #include "formats/parquet/column_reader_factory.h" #include "formats/parquet/metadata.h" -#include "formats/parquet/page_index_reader.h" #include "formats/parquet/scalar_column_reader.h" #include "formats/parquet/schema.h" #include "formats/parquet/zone_map_filter_evaluator.h" @@ -80,29 +79,20 @@ Status GroupReader::_deal_with_pageindex() { if (config::parquet_page_index_enable) { SCOPED_RAW_TIMER(&_param.stats->page_index_ns); _param.stats->rows_before_page_index += _row_group_metadata->num_rows; - if (config::parquet_advance_zonemap_filter) { - ASSIGN_OR_RETURN(auto sparse_range, _param.predicate_tree->visit(ZoneMapEvaluator{ - *_param.predicate_tree, this})); - if (sparse_range.has_value()) { - if (sparse_range.value().empty()) { - // the whole row group has been filtered - _is_group_filtered = true; - } else if (sparse_range->span_size() < _row_group_metadata->num_rows) { - // some pages have been filtered - _range = sparse_range.value(); - for (const auto& pair : _column_readers) { - pair.second->select_offset_index(_range, _row_group_first_row); - } + + ASSIGN_OR_RETURN(auto sparse_range, _param.predicate_tree->visit(ZoneMapEvaluator{ + *_param.predicate_tree, this})); + if (sparse_range.has_value()) { + if (sparse_range.value().empty()) { + // the whole row group has been filtered + _is_group_filtered = true; + } else if (sparse_range->span_size() < _row_group_metadata->num_rows) { + // some pages have been filtered + _range = sparse_range.value(); + for (const auto& pair : _column_readers) { + pair.second->select_offset_index(_range, _row_group_first_row); } } - } else { - auto page_index_reader = - std::make_unique(this, _param.file, _column_readers, _row_group_metadata, - _param.min_max_conjunct_ctxs, _param.conjunct_ctxs_by_slot); - ASSIGN_OR_RETURN(bool flag, page_index_reader->generate_read_range(_range)); - if (flag && !_is_group_filtered) { - page_index_reader->select_column_offset_index(); - } } } diff --git a/be/src/formats/parquet/group_reader.h b/be/src/formats/parquet/group_reader.h index b0457a551f041..a89343d83baa3 100644 --- a/be/src/formats/parquet/group_reader.h +++ b/be/src/formats/parquet/group_reader.h @@ -106,11 +106,7 @@ struct GroupReaderParam { ColumnIdToGlobalDictMap* global_dictmaps = &EMPTY_GLOBAL_DICTMAPS; }; -class PageIndexReader; - class GroupReader { - friend class PageIndexReader; - public: GroupReader(GroupReaderParam& param, int row_group_number, SkipRowsContextPtr skip_rows_ctx, int64_t row_group_first_row); @@ -166,7 +162,6 @@ class GroupReader { const tparquet::RowGroup* _row_group_metadata = nullptr; int64_t _row_group_first_row = 0; SkipRowsContextPtr _skip_rows_ctx; - int64_t _raw_rows_read = 0; // column readers for column chunk in row group std::unordered_map> _column_readers; diff --git a/be/src/formats/parquet/page_index_reader.cpp b/be/src/formats/parquet/page_index_reader.cpp deleted file mode 100644 index 3e556d7a1100e..0000000000000 --- a/be/src/formats/parquet/page_index_reader.cpp +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "formats/parquet/page_index_reader.h" - -#include -#include -#include - -#include "column/chunk.h" -#include "column/column.h" -#include "column/column_helper.h" -#include "column/vectorized_fwd.h" -#include "common/compiler_util.h" -#include "common/config.h" -#include "common/status.h" -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "formats/parquet/column_reader.h" -#include "formats/parquet/schema.h" -#include "formats/parquet/statistics_helper.h" -#include "gen_cpp/parquet_types.h" -#include "gutil/stringprintf.h" -#include "runtime/types.h" -#include "simd/simd.h" -#include "util/slice.h" -#include "util/thrift_util.h" - -namespace starrocks::parquet { - -void ColumnOffsetIndexCtx::collect_io_range(std::vector* ranges, - int64_t* end_offset, bool active) { - for (size_t i = 0; i < page_selected.size(); i++) { - if (page_selected[i]) { - auto r = io::SharedBufferedInputStream::IORange( - offset_index.page_locations[i].offset, offset_index.page_locations[i].compressed_page_size, active); - ranges->emplace_back(r); - *end_offset = std::max(*end_offset, r.offset + r.size); - } - } -} - -void PageIndexReader::_split_min_max_conjuncts_by_slot( - std::unordered_map>& slot_id_to_ctx_map) { - for (auto* ctx : _min_max_conjunct_ctxs) { - std::vector slot_ids; - ctx->root()->get_slot_ids(&slot_ids); - if (slot_ids.size() != 1) { - continue; - } - if (_column_readers.find(slot_ids[0]) == _column_readers.end()) { - continue; - } - - if (slot_id_to_ctx_map.find(slot_ids[0]) == slot_id_to_ctx_map.end()) { - slot_id_to_ctx_map.insert({slot_ids[0], std::vector({ctx})}); - } else { - slot_id_to_ctx_map[slot_ids[0]].emplace_back(ctx); - } - } -} - -bool PageIndexReader::_more_conjunct_for_statistics(SlotId id) { - if (!config::parquet_statistics_process_more_filter_enable) { - return false; - } - if (_conjunct_ctxs_by_slot.find(id) == _conjunct_ctxs_by_slot.end()) { - return false; - } - StatisticsHelper::StatSupportedFilter filter_type; - for (auto ctx : _conjunct_ctxs_by_slot.at(id)) { - if (StatisticsHelper::can_be_used_for_statistics_filter(ctx, filter_type)) { - return true; - } - } - return false; -} - -Status PageIndexReader::_deal_with_min_max_conjuncts(const std::vector& ctxs, - const tparquet::ColumnIndex& column_index, SlotId id, - const TypeDescriptor& type, Filter& page_filter) { - auto min_chunk = std::make_unique(); - ColumnPtr min_column = ColumnHelper::create_column(type, true); - min_chunk->append_column(min_column, id); - auto max_chunk = std::make_unique(); - ColumnPtr max_column = ColumnHelper::create_column(type, true); - max_chunk->append_column(max_column, id); - // deal with min_values - auto st = StatisticsHelper::decode_value_into_column(min_column, column_index.min_values, column_index.null_pages, - type, _column_readers.at(id)->get_column_parquet_field(), - _group_reader->_param.timezone); - if (!st.ok()) { - // swallow error status - LOG(INFO) << "Error when decode min/max statistics, slotid " << id << ", type " << type.debug_string(); - return Status::OK(); - } - - // deal with max_values - st = StatisticsHelper::decode_value_into_column(max_column, column_index.max_values, column_index.null_pages, type, - _column_readers.at(id)->get_column_parquet_field(), - _group_reader->_param.timezone); - if (!st.ok()) { - // swallow error status - LOG(INFO) << "Error when decode min/max statistics, slotid " << id << ", type " << type.debug_string(); - return Status::OK(); - } - - size_t page_num = column_index.min_values.size(); - // both min and max value are filtered, the page is filtered. - // for example pages {100, 200}, {200, 400}, {400, 600}, {500, 800}, {800, 1000} - // conjuncts is >= 300, <= 700 - // for >= 300, min_selected is {0, 0, 1, 1, 1}, max_selected is {0, 1, 1, 1, 1} - // min_selected or max_selected is {0, 1, 1, 1, 1} - // so the page_filter will be {0, 1, 1, 1, 1} - // for <= 700, min_selected is {1, 1, 1, 1, 0}, max_selected is {1, 1, 1, 0, 0} - // min_selected or max_selected is {1, 1, 1, 1, 0} - // so the page_filter will be {0, 1, 1, 1, 0} - for (auto* ctx : ctxs) { - ASSIGN_OR_RETURN(ColumnPtr min_selected, ctx->evaluate(min_chunk.get())); - ASSIGN_OR_RETURN(ColumnPtr max_selected, ctx->evaluate(max_chunk.get())); - auto unpack_min_selected = ColumnHelper::unpack_and_duplicate_const_column(page_num, min_selected); - auto unpack_max_selected = ColumnHelper::unpack_and_duplicate_const_column(page_num, max_selected); - Filter min_filter = ColumnHelper::merge_nullable_filter(unpack_min_selected.get()); - Filter max_filter = ColumnHelper::merge_nullable_filter(unpack_max_selected.get()); - ColumnHelper::or_two_filters(&min_filter, max_filter.data()); - ColumnHelper::merge_two_filters(&page_filter, min_filter.data()); - } - return Status::OK(); -} - -Status PageIndexReader::_deal_with_more_conjunct(const std::vector& ctxs, - const tparquet::ColumnIndex& column_index, - const tparquet::OffsetIndex& offset_index, const ParquetField* field, - const std::string& timezone, Filter& page_filter) { - if (!config::parquet_statistics_process_more_filter_enable) { - return Status::OK(); - } - - StatisticsHelper::StatSupportedFilter filter_type; - for (auto* ctx : ctxs) { - if (StatisticsHelper::can_be_used_for_statistics_filter(ctx, filter_type)) { - if (filter_type == StatisticsHelper::StatSupportedFilter::IS_NULL) { - DCHECK(field->max_rep_level() == 0); - if (column_index.__isset.null_counts) { - if (UNLIKELY(column_index.null_counts.size() != page_filter.size())) { - return Status::Aborted( - fmt::format("null_counts size doesn't match page size for {}", field->name)); - } - for (size_t i = 0; i < column_index.null_counts.size(); i++) { - page_filter[i] = column_index.null_counts[i] == 0 ? 0 : page_filter[i]; - } - } - } else if (filter_type == StatisticsHelper::StatSupportedFilter::IS_NOT_NULL) { - DCHECK(field->max_rep_level() == 0); - if (column_index.__isset.null_counts) { - if (UNLIKELY(column_index.null_counts.size() != page_filter.size())) { - return Status::Aborted( - fmt::format("null_counts size doesn't match page size for {}", field->name)); - } - for (size_t i = 0; i < column_index.null_counts.size(); i++) { - int64_t page_size = i == (offset_index.page_locations.size() - 1) - ? (_row_group_metadata->num_rows - - offset_index.page_locations[i].first_row_index) - : (offset_index.page_locations[i + 1].first_row_index - - offset_index.page_locations[i].first_row_index); - page_filter[i] = column_index.null_counts[i] == page_size ? 0 : page_filter[i]; - } - } - } else if (filter_type == StatisticsHelper::StatSupportedFilter::FILTER_IN) { - if (column_index.__isset.null_counts) { - RETURN_IF_ERROR(StatisticsHelper::in_filter_on_min_max_stat( - column_index.min_values, column_index.max_values, column_index.null_pages, - column_index.null_counts, ctx, field, timezone, page_filter)); - } - } else if (filter_type == StatisticsHelper::StatSupportedFilter::RF_MIN_MAX) { - if (column_index.__isset.null_counts) { - RETURN_IF_ERROR(StatisticsHelper::min_max_filter_on_min_max_stat( - column_index.min_values, column_index.max_values, column_index.null_pages, - column_index.null_counts, ctx, field, timezone, page_filter)); - } - } - } - } - return Status::OK(); -} - -StatusOr PageIndexReader::generate_read_range(SparseRange& sparse_range) { - // _min_max_conjunct_ctxs to map> - bool page_filtered_flag = false; - std::unordered_map> slot_id_to_min_max_ctx_map; - _split_min_max_conjuncts_by_slot(slot_id_to_min_max_ctx_map); - - for (int idx : _group_reader->_active_column_indices) { - const auto& column = _group_reader->_param.read_cols[idx]; - // complex type will be supported later - if (column.slot_type().is_complex_type()) { - continue; - } - SlotId slotId = column.slot_id(); - // no min_max conjunct - if (slot_id_to_min_max_ctx_map.find(slotId) == slot_id_to_min_max_ctx_map.end() && - !_more_conjunct_for_statistics(slotId)) { - continue; - } - - // no page index - const tparquet::ColumnChunk* chunk_meta = _column_readers.at(slotId)->get_chunk_metadata(); - if (!chunk_meta->__isset.column_index_offset || !chunk_meta->__isset.offset_index_offset || - !chunk_meta->__isset.meta_data) { - continue; - } - - // get column index - int64_t column_index_offset = chunk_meta->column_index_offset; - uint32_t column_index_length = chunk_meta->column_index_length; - - std::vector page_index_data; - page_index_data.reserve(column_index_length); - RETURN_IF_ERROR(_file->read_at_fully(column_index_offset, page_index_data.data(), column_index_length)); - - tparquet::ColumnIndex column_index; - RETURN_IF_ERROR(deserialize_thrift_msg(page_index_data.data(), &column_index_length, TProtocolType::COMPACT, - &column_index)); - - ASSIGN_OR_RETURN(const tparquet::OffsetIndex* offset_index, - _column_readers.at(slotId)->get_offset_index(_group_reader->_row_group_first_row)); - - size_t page_num = column_index.min_values.size(); - Filter page_filter(page_num, 1); - - if (slot_id_to_min_max_ctx_map.find(slotId) != slot_id_to_min_max_ctx_map.end()) { - RETURN_IF_ERROR(_deal_with_min_max_conjuncts(slot_id_to_min_max_ctx_map.at(slotId), column_index, slotId, - column.slot_type(), page_filter)); - } - if (SIMD::contain_nonzero(page_filter) && _conjunct_ctxs_by_slot.find(slotId) != _conjunct_ctxs_by_slot.end()) { - RETURN_IF_ERROR(_deal_with_more_conjunct(_conjunct_ctxs_by_slot.at(slotId), column_index, *offset_index, - _column_readers.at(slotId)->get_column_parquet_field(), - _group_reader->_param.timezone, page_filter)); - } - - if (!SIMD::contain_zero(page_filter)) { - continue; - } - - page_filtered_flag = true; - - SparseRange column_sparse_range; - for (int i = 0; i < page_num; i++) { - if (page_filter[i]) { - int64_t first_row = - offset_index->page_locations[i].first_row_index + _group_reader->_row_group_first_row; - int64_t end_row = first_row; - if (i != page_num - 1) { - end_row = offset_index->page_locations[i + 1].first_row_index + _group_reader->_row_group_first_row; - } else { - end_row = _group_reader->_row_group_first_row + _row_group_metadata->num_rows; - } - column_sparse_range.add(Range(first_row, end_row)); - } - } - sparse_range = sparse_range.intersection(column_sparse_range); - if (sparse_range.empty()) { - _group_reader->_is_group_filtered = true; - break; - } - } - - return page_filtered_flag; -} - -void PageIndexReader::select_column_offset_index() { - for (const auto& pair : _column_readers) { - pair.second->select_offset_index(_group_reader->_range, _group_reader->_row_group_first_row); - } -} -} // namespace starrocks::parquet \ No newline at end of file diff --git a/be/src/formats/parquet/page_index_reader.h b/be/src/formats/parquet/page_index_reader.h deleted file mode 100644 index 8dee10c91b2b9..0000000000000 --- a/be/src/formats/parquet/page_index_reader.h +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include -#include -#include -#include - -#include "common/global_types.h" -#include "common/status.h" -#include "common/statusor.h" -#include "exprs/expr_context.h" -#include "exprs/function_context.h" -#include "formats/parquet/column_reader.h" -#include "formats/parquet/group_reader.h" -#include "formats/parquet/schema.h" -#include "gen_cpp/parquet_types.h" -#include "io/shared_buffered_input_stream.h" -#include "runtime/types.h" -#include "storage/range.h" - -namespace starrocks { -class RandomAccessFile; - -namespace parquet { -class ColumnReader; -class GroupReader; -struct ParquetField; -} // namespace parquet -struct TypeDescriptor; -} // namespace starrocks - -namespace starrocks::parquet { - -struct ColumnOffsetIndexCtx { - tparquet::OffsetIndex offset_index; - std::vector page_selected; - uint64_t rg_first_row; - - void collect_io_range(std::vector* ranges, int64_t* end_offset, - bool active); - - // be compatible with PARQUET-1850 - bool check_dictionary_page(int64_t data_page_offset) { - return offset_index.page_locations.size() > 0 && offset_index.page_locations[0].offset > data_page_offset; - } -}; - -class PageIndexReader { -public: - PageIndexReader(GroupReader* group_reader, RandomAccessFile* file, - const std::unordered_map>& column_readers, - const tparquet::RowGroup* meta, const std::vector& min_max_conjunct_ctxs, - const std::unordered_map>& conjunct_ctxs_by_slot) - : _group_reader(group_reader), - _file(file), - _column_readers(column_readers), - _row_group_metadata(meta), - _min_max_conjunct_ctxs(min_max_conjunct_ctxs), - _conjunct_ctxs_by_slot(conjunct_ctxs_by_slot) {} - - StatusOr generate_read_range(SparseRange& sparse_range); - - void select_column_offset_index(); - -private: - void _split_min_max_conjuncts_by_slot(std::unordered_map>& slot_id_to_ctx_map); - bool _more_conjunct_for_statistics(SlotId id); - Status _deal_with_min_max_conjuncts(const std::vector& ctxs, - const tparquet::ColumnIndex& column_index, SlotId id, - const TypeDescriptor& type, Filter& page_filter); - Status _deal_with_more_conjunct(const std::vector& ctxs, const tparquet::ColumnIndex& column_index, - const tparquet::OffsetIndex& offset_index, const ParquetField* field, - const std::string& timezone, Filter& page_filter); - - GroupReader* _group_reader = nullptr; - RandomAccessFile* _file = nullptr; - // column readers for column chunk in row group - const std::unordered_map>& _column_readers; - // row group meta - const tparquet::RowGroup* _row_group_metadata = nullptr; - - // min/max conjuncts - const std::vector& _min_max_conjunct_ctxs; - - // conjuncts by slot - const std::unordered_map>& _conjunct_ctxs_by_slot; -}; - -} // namespace starrocks::parquet \ No newline at end of file diff --git a/be/src/formats/parquet/scalar_column_reader.cpp b/be/src/formats/parquet/scalar_column_reader.cpp index 773308a5d95dd..374eec6ab5ee8 100644 --- a/be/src/formats/parquet/scalar_column_reader.cpp +++ b/be/src/formats/parquet/scalar_column_reader.cpp @@ -14,6 +14,7 @@ #include "formats/parquet/scalar_column_reader.h" +#include "formats/parquet/column_reader.h" #include "formats/parquet/stored_column_reader_with_index.h" #include "formats/parquet/utils.h" #include "formats/parquet/zone_map_filter_evaluator.h" diff --git a/be/src/formats/parquet/scalar_column_reader.h b/be/src/formats/parquet/scalar_column_reader.h index e1ec5c4a0b9df..5a30971cd856f 100644 --- a/be/src/formats/parquet/scalar_column_reader.h +++ b/be/src/formats/parquet/scalar_column_reader.h @@ -16,7 +16,6 @@ #include "formats/parquet/column_converter.h" #include "formats/parquet/column_reader.h" -#include "formats/parquet/page_index_reader.h" #include "formats/parquet/stored_column_reader.h" #include "util/thrift_util.h" diff --git a/be/src/formats/parquet/stored_column_reader_with_index.h b/be/src/formats/parquet/stored_column_reader_with_index.h index e51e5465935aa..b4f6e34a91596 100644 --- a/be/src/formats/parquet/stored_column_reader_with_index.h +++ b/be/src/formats/parquet/stored_column_reader_with_index.h @@ -23,7 +23,7 @@ #include "column/vectorized_fwd.h" #include "common/status.h" -#include "formats/parquet/page_index_reader.h" +#include "formats/parquet/column_reader.h" #include "formats/parquet/stored_column_reader.h" #include "formats/parquet/types.h" #include "formats/parquet/utils.h" diff --git a/be/test/formats/parquet/file_reader_test.cpp b/be/test/formats/parquet/file_reader_test.cpp index 3ff9a62b93c2c..0222f4ec6e4a6 100644 --- a/be/test/formats/parquet/file_reader_test.cpp +++ b/be/test/formats/parquet/file_reader_test.cpp @@ -618,13 +618,16 @@ StatusOr FileReaderTest::_create_in_const_pred(SlotId slot_id, con StatusOr FileReaderTest::_create_context_for_in_filter(SlotId slot_id) { Utils::SlotDesc slot_descs[] = { {"c1", TYPE_INT_DESC, 1}, {"c2", TYPE_BIGINT_DESC, 2}, {"c3", TYPE_VARCHAR_DESC, 3}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, _all_null_parquet_file); std::vector values{1, 3, 5}; ASSIGN_OR_RETURN(auto* expr_ctx, _create_in_const_pred(slot_id, values, true, false)); std::vector expr_ctxs{expr_ctx}; + auto scan_ctx = _create_scan_context(slot_descs, _all_null_parquet_file); scan_ctx->conjunct_ctxs_by_slot.insert({slot_id, expr_ctxs}); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->conjunct_ctxs_by_slot[slot_id], nullptr, tuple_desc, + _runtime_state, scan_ctx); return scan_ctx; } @@ -632,101 +635,144 @@ StatusOr FileReaderTest::_create_context_for_in_filter(Slot StatusOr FileReaderTest::_create_context_for_in_filter_normal(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"col1", TYPE_INT_DESC, 1}, {"col2", TYPE_INT_DESC, 2}, {"col3", TYPE_INT_DESC, 3}, {"col4", TYPE_INT_DESC, 4}, {"col5", TYPE_INT_DESC, 5}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, _filter_row_group_path_1); std::vector values{5, 6}; ASSIGN_OR_RETURN(auto* expr_ctx, _create_in_const_pred(slot_id, values, false, false)); std::vector expr_ctxs{expr_ctx}; + auto scan_ctx = _create_scan_context(slot_descs, _filter_row_group_path_1); scan_ctx->conjunct_ctxs_by_slot.insert({slot_id, expr_ctxs}); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->conjunct_ctxs_by_slot[slot_id], nullptr, tuple_desc, + _runtime_state, scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_min_max_all_null_group(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c1", TYPE_INT_DESC}, {"c2", TYPE_BIGINT_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _all_null_parquet_file); std::vector t_conjuncts; ParquetUTBase::append_int_conjunct(TExprOpcode::GE, slot_id, 4, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _all_null_parquet_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_bool(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_bool", TYPE_BOOLEAN_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_int_conjunct(TExprOpcode::GE, slot_id, false, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_smallint(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_smallint", TYPE_SMALLINT_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_smallint_conjunct(TExprOpcode::GT, slot_id, 3, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_int32(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_int32", TYPE_INT_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_int_conjunct(TExprOpcode::GT, slot_id, 33, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_int64(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_int64", TYPE_BIGINT_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_bigint_conjunct(TExprOpcode::GT, slot_id, 333, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_string(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_string", TYPE_VARCHAR_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_string_conjunct(TExprOpcode::GT, slot_id, "33333", &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_decimal(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_decimal", TYPE_DECIMAL128_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_decimal_conjunct(TExprOpcode::GT, slot_id, "333.300000000", &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } StatusOr FileReaderTest::_create_context_for_has_null_page_datetime(SlotId slot_id) { Utils::SlotDesc slot_descs[] = {{"c_datetime", TYPE_DATETIME_DESC}, {""}}; - auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); std::vector t_conjuncts; ParquetUTBase::append_datetime_conjunct(TExprOpcode::GT, slot_id, "2024-01-10 00:00:00", &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &scan_ctx->min_max_conjunct_ctxs); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto scan_ctx = _create_scan_context(slot_descs, slot_descs, _has_null_page_file); + scan_ctx->min_max_conjunct_ctxs.insert(scan_ctx->min_max_conjunct_ctxs.end(), expr_ctxs.begin(), expr_ctxs.end()); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(scan_ctx->min_max_conjunct_ctxs, nullptr, tuple_desc, _runtime_state, + scan_ctx); return scan_ctx; } @@ -778,6 +824,10 @@ StatusOr FileReaderTest::_create_context_for_filter_row_gro ctx->partition_values.emplace_back(partition_col4); ctx->partition_values.emplace_back(partition_col5); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[slot_id], _rf_probe_collector, tuple_desc, + _runtime_state, ctx); + return ctx; } @@ -786,7 +836,6 @@ StatusOr FileReaderTest::_create_context_for_filter_page_in Utils::SlotDesc slot_descs[] = {{"lo_orderkey", TYPE_INT_DESC, 1}, {"col2", TYPE_INT_DESC, 2}, {"col3", TYPE_INT_DESC, 3}, {"col4", TYPE_INT_DESC, 4}, {"col5", TYPE_INT_DESC, 5}, {""}}; - auto ctx = _create_scan_context(slot_descs, _filter_row_group_path_1); auto* rf = _pool.add(new RuntimeBloomFilter()); ASSIGN_OR_RETURN(auto* rf_desc, gen_runtime_filter_desc(slot_id)); @@ -801,6 +850,15 @@ StatusOr FileReaderTest::_create_context_for_filter_page_in rf_desc->set_runtime_filter(rf); _rf_probe_collector->add_descriptor(rf_desc); + Expr* min_max_predicate = nullptr; + RuntimeFilterHelper::create_min_max_value_predicate(&_pool, slot_id, TYPE_INT, rf, &min_max_predicate); + ExprContext* expr_ctx = _pool.add(new ExprContext(min_max_predicate)); + RETURN_IF_ERROR(expr_ctx->prepare(_runtime_state)); + RETURN_IF_ERROR(expr_ctx->open(_runtime_state)); + std::vector expr_ctxs{expr_ctx}; + auto ctx = _create_scan_context(slot_descs, _filter_row_group_path_1); + ctx->conjunct_ctxs_by_slot.insert({slot_id, expr_ctxs}); + ColumnPtr partition_col3 = ColumnHelper::create_const_column(5, 1); ColumnPtr partition_col4 = ColumnHelper::create_const_column(2, 1); ColumnPtr partition_col5 = ColumnHelper::create_const_null_column(1); @@ -811,13 +869,9 @@ StatusOr FileReaderTest::_create_context_for_filter_page_in ctx->partition_values.emplace_back(partition_col4); ctx->partition_values.emplace_back(partition_col5); - Expr* min_max_predicate = nullptr; - RuntimeFilterHelper::create_min_max_value_predicate(&_pool, slot_id, TYPE_INT, rf, &min_max_predicate); - ExprContext* expr_ctx = _pool.add(new ExprContext(min_max_predicate)); - RETURN_IF_ERROR(expr_ctx->prepare(_runtime_state)); - RETURN_IF_ERROR(expr_ctx->open(_runtime_state)); - std::vector expr_ctxs{expr_ctx}; - ctx->conjunct_ctxs_by_slot.insert({slot_id, expr_ctxs}); + TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[slot_id], _rf_probe_collector, tuple_desc, + _runtime_state, ctx); return ctx; } @@ -2707,7 +2761,6 @@ TEST_F(FileReaderTest, TestStructSubfieldDictFilter) { TEST_F(FileReaderTest, TestStructSubfieldZonemap) { const std::string struct_in_struct_file_path = "./be/test/formats/parquet/test_data/test_parquet_struct_in_struct.parquet"; - auto ctx = _create_file_struct_in_struct_read_context(struct_in_struct_file_path); auto file = _create_file(struct_in_struct_file_path); @@ -2726,9 +2779,11 @@ TEST_F(FileReaderTest, TestStructSubfieldZonemap) { type_struct_in_struct.field_names.emplace_back("c_struct"); std::vector subfield_path({"c_struct", "c0"}); - + std::vector expr_ctxs; _create_struct_subfield_predicate_conjunct_ctxs(TExprOpcode::EQ, 3, type_struct_in_struct, subfield_path, "0", - &ctx->conjunct_ctxs_by_slot[3]); + &expr_ctxs); + auto ctx = _create_file_struct_in_struct_read_context(struct_in_struct_file_path); + ctx->conjunct_ctxs_by_slot.insert({3, expr_ctxs}); auto file_reader = std::make_shared(config::vector_chunk_size, file.get(), std::filesystem::file_size(struct_in_struct_file_path)); @@ -2766,7 +2821,7 @@ TEST_F(FileReaderTest, TestStructSubfieldZonemap) { }; TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); // RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs)); - ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[3], tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[3], nullptr, tuple_desc, _runtime_state, ctx); Status status = file_reader->init(ctx); ASSERT_TRUE(status.ok()); @@ -2780,9 +2835,6 @@ TEST_F(FileReaderTest, TestStructSubfieldZonemap) { total_row_nums += chunk->num_rows(); } EXPECT_EQ(0, total_row_nums); - - ctx->predicate_free_pool.clear(); - ctx->conjuncts_manager = nullptr; } TEST_F(FileReaderTest, TestReadRoundByRound) { @@ -3011,14 +3063,16 @@ TEST_F(FileReaderTest, TestIsNullStatistics) { Utils::SlotDesc slot_descs[] = { {"c0", TYPE_INT_DESC}, {"c1", TYPE_INT_DESC}, {"c2", TYPE_VARCHAR_DESC}, {"c3", TYPE_INT_ARRAY_DESC}, {""}, }; - auto ctx = _create_file_random_read_context(small_page_file, slot_descs); std::vector t_conjuncts; ParquetUTBase::is_null_pred(0, true, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &ctx->conjunct_ctxs_by_slot[0]); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto ctx = _create_file_random_read_context(small_page_file, slot_descs); + ctx->conjunct_ctxs_by_slot.insert({0, expr_ctxs}); // setup OlapScanConjunctsManager TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); - ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[0], tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[0], nullptr, tuple_desc, _runtime_state, ctx); Status status = file_reader->init(ctx); ASSERT_TRUE(status.ok()); @@ -3046,16 +3100,19 @@ TEST_F(FileReaderTest, TestInFilterStatitics) { Utils::SlotDesc slot_descs[] = { {"c0", TYPE_INT_DESC}, {"c1", TYPE_INT_DESC}, {"c2", TYPE_VARCHAR_DESC}, {"c3", TYPE_INT_ARRAY_DESC}, {""}, }; - auto ctx = _create_file_random_read_context(multi_rg_file, slot_descs); // min value and max value in this file, so it will be in the first and last row group std::set in_oprands{1, 100000}; std::vector t_conjuncts; ParquetUTBase::create_in_predicate_int_conjunct_ctxs(TExprOpcode::FILTER_IN, 0, in_oprands, &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &ctx->conjunct_ctxs_by_slot[0]); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + + auto ctx = _create_file_random_read_context(multi_rg_file, slot_descs); + ctx->conjunct_ctxs_by_slot.insert({0, expr_ctxs}); // setup OlapScanConjunctsManager TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); - ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[0], tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[0], nullptr, tuple_desc, _runtime_state, ctx); ASSERT_OK(file_reader->init(ctx)); EXPECT_EQ(file_reader->row_group_size(), 2); @@ -3064,8 +3121,6 @@ TEST_F(FileReaderTest, TestInFilterStatitics) { // parquet has no null // filter the first row group TEST_F(FileReaderTest, filter_row_group_with_rf_1) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_filter_row_group_path_1); @@ -3080,8 +3135,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_1) { // parquet has no null // filter no group TEST_F(FileReaderTest, filter_row_group_with_rf_2) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_filter_row_group_path_1); @@ -3096,8 +3149,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_2) { // parquet has no null // filter all group TEST_F(FileReaderTest, filter_row_group_with_rf_3) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_filter_row_group_path_1); @@ -3112,8 +3163,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_3) { // parquet has null // filter no group TEST_F(FileReaderTest, filter_row_group_with_rf_4) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_filter_row_group_path_2); @@ -3129,8 +3178,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_4) { // partition column has no null // filter no group TEST_F(FileReaderTest, filter_row_group_with_rf_5) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 3; auto file_reader = _create_file_reader(_filter_row_group_path_2); @@ -3146,8 +3193,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_5) { // partition column has no null // filter all group TEST_F(FileReaderTest, filter_row_group_with_rf_6) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 4; auto file_reader = _create_file_reader(_filter_row_group_path_2); @@ -3163,8 +3208,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_6) { // partition column has null // filter no group TEST_F(FileReaderTest, filter_row_group_with_rf_7) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 5; auto file_reader = _create_file_reader(_filter_row_group_path_2); @@ -3180,8 +3223,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_7) { // column not exist // filter no group TEST_F(FileReaderTest, filter_row_group_with_rf_8) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 8; auto file_reader = _create_file_reader(_filter_row_group_path_2); @@ -3194,7 +3235,6 @@ TEST_F(FileReaderTest, filter_row_group_with_rf_8) { } TEST_F(FileReaderTest, update_rf_and_filter_row_group) { - config::parquet_advance_zonemap_filter = true; SlotId slot_id = 0; auto file_reader = _create_file_reader(_filter_row_group_path_3); @@ -3226,8 +3266,6 @@ TEST_F(FileReaderTest, update_rf_and_filter_row_group) { } TEST_F(FileReaderTest, filter_page_index_with_rf_has_null) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_filter_page_index_with_rf_has_null); @@ -3241,8 +3279,6 @@ TEST_F(FileReaderTest, filter_page_index_with_rf_has_null) { } TEST_F(FileReaderTest, all_type_has_null_page_bool) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3256,8 +3292,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_bool) { } TEST_F(FileReaderTest, all_type_has_null_page_smallint) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3271,8 +3305,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_smallint) { } TEST_F(FileReaderTest, all_type_has_null_page_int) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3286,8 +3318,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_int) { } TEST_F(FileReaderTest, all_type_has_null_page_bigint) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3301,8 +3331,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_bigint) { } TEST_F(FileReaderTest, all_type_has_null_page_datetime) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3316,8 +3344,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_datetime) { } TEST_F(FileReaderTest, all_type_has_null_page_string) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3331,8 +3357,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_string) { } TEST_F(FileReaderTest, all_type_has_null_page_decimal) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_has_null_page_file); @@ -3346,8 +3370,6 @@ TEST_F(FileReaderTest, all_type_has_null_page_decimal) { } TEST_F(FileReaderTest, all_null_group_in_filter) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_all_null_parquet_file); @@ -3359,8 +3381,6 @@ TEST_F(FileReaderTest, all_null_group_in_filter) { } TEST_F(FileReaderTest, in_filter_filter_one_group) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 1; auto file_reader = _create_file_reader(_filter_row_group_path_1); @@ -3372,8 +3392,6 @@ TEST_F(FileReaderTest, in_filter_filter_one_group) { } TEST_F(FileReaderTest, min_max_filter_all_null_group) { - config::parquet_advance_zonemap_filter = false; - DeferOp defer([&]() { config::parquet_advance_zonemap_filter = true; }); SlotId slot_id = 0; auto file_reader = _create_file_reader(_all_null_parquet_file); @@ -3392,7 +3410,6 @@ TEST_F(FileReaderTest, low_card_reader) { const std::string small_page_file = "./be/test/formats/parquet/test_data/page_index_small_page.parquet"; Utils::SlotDesc slot_descs[] = {{"c0", TYPE_INT_DESC}, {"c2", TYPE_INT_DESC}, {""}}; - auto ctx = _create_file_random_read_context(small_page_file, slot_descs); std::vector values; for (int i = 0; i < 100; ++i) { @@ -3407,13 +3424,15 @@ TEST_F(FileReaderTest, low_card_reader) { } dict_map[1] = &g_dict; - ctx->global_dictmaps = &dict_map; - std::vector t_conjuncts; ParquetUTBase::create_dictmapping_string_conjunct(TExprOpcode::EQ, 1, "2", &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &ctx->conjunct_ctxs_by_slot[1]); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto ctx = _create_file_random_read_context(small_page_file, slot_descs); + ctx->conjunct_ctxs_by_slot.insert({1, expr_ctxs}); + ctx->global_dictmaps = &dict_map; TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); - ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[1], tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[1], nullptr, tuple_desc, _runtime_state, ctx); auto file_reader = _create_file_reader(small_page_file); Status status = file_reader->init(ctx); @@ -3440,15 +3459,12 @@ TEST_F(FileReaderTest, low_card_reader) { } EXPECT_EQ(200, total_row_nums); - ctx->predicate_free_pool.clear(); - ctx->conjuncts_manager = nullptr; } TEST_F(FileReaderTest, low_card_reader_filter_group) { const std::string small_page_file = "./be/test/formats/parquet/test_data/page_index_small_page.parquet"; Utils::SlotDesc slot_descs[] = {{"c0", TYPE_INT_DESC}, {"c2", TYPE_INT_DESC}, {""}}; - auto ctx = _create_file_random_read_context(small_page_file, slot_descs); std::vector values; for (int i = 0; i < 100; ++i) { @@ -3463,21 +3479,21 @@ TEST_F(FileReaderTest, low_card_reader_filter_group) { } dict_map[1] = &g_dict; - ctx->global_dictmaps = &dict_map; - std::vector t_conjuncts; ParquetUTBase::create_dictmapping_string_conjunct(TExprOpcode::GT, 1, "a", &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &ctx->conjunct_ctxs_by_slot[1]); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto ctx = _create_file_random_read_context(small_page_file, slot_descs); + ctx->conjunct_ctxs_by_slot.insert({1, expr_ctxs}); + ctx->global_dictmaps = &dict_map; TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); tuple_desc->decoded_slots()[1]->type().type = TYPE_VARCHAR; - ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[1], tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[1], nullptr, tuple_desc, _runtime_state, ctx); auto file_reader = _create_file_reader(small_page_file); Status status = file_reader->init(ctx); ASSERT_TRUE(status.ok()); EXPECT_EQ(file_reader->row_group_size(), 0); - ctx->predicate_free_pool.clear(); - ctx->conjuncts_manager = nullptr; } TEST_F(FileReaderTest, low_card_reader_dict_not_match) { @@ -3624,7 +3640,6 @@ TEST_F(FileReaderTest, low_rows_reader_filter_group) { const std::string small_page_file = "./be/test/formats/parquet/test_data/low_rows_non_dict.parquet"; Utils::SlotDesc slot_descs[] = {{"c0", TYPE_INT_DESC}, {"c2", TYPE_INT_DESC}, {""}}; - auto ctx = _create_file_random_read_context(small_page_file, slot_descs); std::vector values; for (int i = 0; i < 100; ++i) { @@ -3639,22 +3654,21 @@ TEST_F(FileReaderTest, low_rows_reader_filter_group) { } dict_map[1] = &g_dict; - ctx->global_dictmaps = &dict_map; - std::vector t_conjuncts; ParquetUTBase::create_dictmapping_string_conjunct(TExprOpcode::EQ, 1, "a", &t_conjuncts); - ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &ctx->conjunct_ctxs_by_slot[1]); + std::vector expr_ctxs; + ParquetUTBase::create_conjunct_ctxs(&_pool, _runtime_state, &t_conjuncts, &expr_ctxs); + auto ctx = _create_file_random_read_context(small_page_file, slot_descs); + ctx->conjunct_ctxs_by_slot.insert({1, expr_ctxs}); + ctx->global_dictmaps = &dict_map; TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); tuple_desc->decoded_slots()[1]->type().type = TYPE_VARCHAR; - ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[1], tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(ctx->conjunct_ctxs_by_slot[1], nullptr, tuple_desc, _runtime_state, ctx); auto file_reader = _create_file_reader(small_page_file); Status status = file_reader->init(ctx); ASSERT_TRUE(status.ok()); EXPECT_EQ(file_reader->row_group_size(), 0); - - ctx->predicate_free_pool.clear(); - ctx->conjuncts_manager = nullptr; } } // namespace starrocks::parquet diff --git a/be/test/formats/parquet/page_index_test.cpp b/be/test/formats/parquet/page_index_test.cpp index e09b67a933c28..d2721089ecd6f 100644 --- a/be/test/formats/parquet/page_index_test.cpp +++ b/be/test/formats/parquet/page_index_test.cpp @@ -338,8 +338,7 @@ TEST_F(PageIndexTest, TestRandomReadWith2PageSize) { } TEST_F(PageIndexTest, TestCollectIORangeWithPageIndex) { - auto test = [&](bool enable_advanced_zone_map) { - config::parquet_advance_zonemap_filter = enable_advanced_zone_map; + auto test = [&]() { auto chunk = std::make_shared(); chunk->append_column( ColumnHelper::create_column(TypeDescriptor::from_logical_type(LogicalType::TYPE_INT), true), @@ -378,7 +377,7 @@ TEST_F(PageIndexTest, TestCollectIORangeWithPageIndex) { for (auto* expr : ctx->conjunct_ctxs_by_slot[0]) { all_conjuncts.push_back(expr); } - ParquetUTBase::setup_conjuncts_manager(all_conjuncts, tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(all_conjuncts, nullptr, tuple_desc, _runtime_state, ctx); auto file_reader = std::make_shared(config::vector_chunk_size, file.get(), std::filesystem::file_size(small_page_file)); @@ -419,15 +418,11 @@ TEST_F(PageIndexTest, TestCollectIORangeWithPageIndex) { EXPECT_EQ(total_row_nums, 1999); }; - bool origin_value = config::parquet_advance_zonemap_filter; - test(true); - test(false); - config::parquet_advance_zonemap_filter = origin_value; + test(); } TEST_F(PageIndexTest, TestTwoColumnIntersectPageIndex) { - auto test = [&](bool enable_advanced_zone_map) { - config::parquet_advance_zonemap_filter = enable_advanced_zone_map; + auto test = [&]() { auto chunk = std::make_shared(); chunk->append_column( ColumnHelper::create_column(TypeDescriptor::from_logical_type(LogicalType::TYPE_INT), true), @@ -486,7 +481,7 @@ TEST_F(PageIndexTest, TestTwoColumnIntersectPageIndex) { for (auto* expr : ctx->conjunct_ctxs_by_slot[1]) { all_conjuncts.push_back(expr); } - ParquetUTBase::setup_conjuncts_manager(all_conjuncts, tuple_desc, _runtime_state, ctx); + ParquetUTBase::setup_conjuncts_manager(all_conjuncts, nullptr, tuple_desc, _runtime_state, ctx); auto shared_buffer = std::make_shared( file->stream(), small_page_file, std::filesystem::file_size(small_page_file)); @@ -533,10 +528,7 @@ TEST_F(PageIndexTest, TestTwoColumnIntersectPageIndex) { EXPECT_EQ(total_row_nums, 10000); }; - bool origin_value = config::parquet_advance_zonemap_filter; - test(true); - test(false); - config::parquet_advance_zonemap_filter = origin_value; + test(); } TEST_F(PageIndexTest, TestPageIndexNoPageFiltered) { diff --git a/be/test/formats/parquet/parquet_ut_base.cpp b/be/test/formats/parquet/parquet_ut_base.cpp index 8bbdd51fb152e..05c14c276e4f1 100644 --- a/be/test/formats/parquet/parquet_ut_base.cpp +++ b/be/test/formats/parquet/parquet_ut_base.cpp @@ -252,13 +252,14 @@ void ParquetUTBase::create_in_predicate_date_conjunct_ctxs(TExprOpcode::type opc tExprs->emplace_back(t_expr); } -void ParquetUTBase::setup_conjuncts_manager(std::vector& conjuncts, TupleDescriptor* tuple_desc, - RuntimeState* runtime_state, HdfsScannerContext* params) { +void ParquetUTBase::setup_conjuncts_manager(std::vector& conjuncts, const RuntimeFilterProbeCollector* rf, + TupleDescriptor* tuple_desc, RuntimeState* runtime_state, + HdfsScannerContext* params) { ScanConjunctsManagerOptions opts; opts.conjunct_ctxs_ptr = &conjuncts; opts.tuple_desc = tuple_desc; opts.obj_pool = runtime_state->obj_pool(); - opts.runtime_filters = runtime_state->obj_pool()->add(new RuntimeFilterProbeCollector()); + opts.runtime_filters = rf; opts.runtime_state = runtime_state; opts.enable_column_expr_predicate = true; opts.is_olap_scan = false; diff --git a/be/test/formats/parquet/parquet_ut_base.h b/be/test/formats/parquet/parquet_ut_base.h index 3e09a0738e9c1..50f47fba94ec4 100644 --- a/be/test/formats/parquet/parquet_ut_base.h +++ b/be/test/formats/parquet/parquet_ut_base.h @@ -56,8 +56,9 @@ class ParquetUTBase { TPrimitiveType::type type, std::set& values, std::vector* tExprs); - static void setup_conjuncts_manager(std::vector& conjuncts, TupleDescriptor* tuple_desc, - RuntimeState* runtime_state, HdfsScannerContext* params); + static void setup_conjuncts_manager(std::vector& conjuncts, const RuntimeFilterProbeCollector* rf, + TupleDescriptor* tuple_desc, RuntimeState* runtime_state, + HdfsScannerContext* params); static void create_dictmapping_string_conjunct(TExprOpcode::type opcode, SlotId slot_id, const std::string& value, std::vector* tExprs);