Skip to content

Commit

Permalink
[Refactor]delete old way to deal with metadata in parquet reader (Sta…
Browse files Browse the repository at this point in the history
…rRocks#55451)

Signed-off-by: zombee0 <ewang2027@gmail.com>
  • Loading branch information
zombee0 authored Feb 5, 2025
1 parent 7c7442b commit d925914
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 875 deletions.
1 change: 0 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ CONF_mBool(parquet_coalesce_read_enable, "true");
CONF_Bool(parquet_late_materialization_enable, "true");
CONF_Bool(parquet_page_index_enable, "true");
CONF_mBool(parquet_statistics_process_more_filter_enable, "true");
CONF_mBool(parquet_advance_zonemap_filter, "true");

CONF_Int32(io_coalesce_read_max_buffer_size, "8388608");
CONF_Int32(io_coalesce_read_max_distance_size, "1048576");
Expand Down
37 changes: 18 additions & 19 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,24 @@ Status HdfsScanner::_build_scanner_context() {
ctx.connector_max_split_size = _scanner_params.connector_max_split_size;
ctx.global_dictmaps = _scanner_params.global_dictmaps;

if (config::parquet_advance_zonemap_filter) {
ScanConjunctsManagerOptions opts;
opts.conjunct_ctxs_ptr = &_scanner_params.all_conjunct_ctxs;
opts.tuple_desc = _scanner_params.tuple_desc;
opts.obj_pool = _runtime_state->obj_pool();
opts.runtime_filters = _scanner_params.runtime_filter_collector;
opts.runtime_state = _runtime_state;
opts.enable_column_expr_predicate = true;
opts.is_olap_scan = false;
opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params();
ctx.conjuncts_manager = std::make_unique<ScanConjunctsManager>(std::move(opts));
RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts());
auto* predicate_parser =
opts.obj_pool->add(new ConnectorPredicateParser(&_scanner_params.tuple_desc->decoded_slots()));
ASSIGN_OR_RETURN(ctx.predicate_tree,
ctx.conjuncts_manager->get_predicate_tree(predicate_parser, ctx.predicate_free_pool));
ctx.rf_scan_range_pruner = opts.obj_pool->add(
new RuntimeScanRangePruner(predicate_parser, ctx.conjuncts_manager->unarrived_runtime_filters()));
}
ScanConjunctsManagerOptions opts;
opts.conjunct_ctxs_ptr = &_scanner_params.all_conjunct_ctxs;
opts.tuple_desc = _scanner_params.tuple_desc;
opts.obj_pool = _runtime_state->obj_pool();
opts.runtime_filters = _scanner_params.runtime_filter_collector;
opts.runtime_state = _runtime_state;
opts.enable_column_expr_predicate = true;
opts.is_olap_scan = false;
opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params();
ctx.conjuncts_manager = std::make_unique<ScanConjunctsManager>(std::move(opts));
RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts());
auto* predicate_parser =
opts.obj_pool->add(new ConnectorPredicateParser(&_scanner_params.tuple_desc->decoded_slots()));
ASSIGN_OR_RETURN(ctx.predicate_tree,
ctx.conjuncts_manager->get_predicate_tree(predicate_parser, ctx.predicate_free_pool));
ctx.rf_scan_range_pruner = opts.obj_pool->add(
new RuntimeScanRangePruner(predicate_parser, ctx.conjuncts_manager->unarrived_runtime_filters()));

return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/formats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ add_library(Formats STATIC
parquet/schema.cpp
parquet/stored_column_reader.cpp
parquet/stored_column_reader_with_index.cpp
parquet/page_index_reader.cpp
parquet/utils.cpp
parquet/metadata.cpp
parquet/meta_helper.cpp
Expand Down
14 changes: 12 additions & 2 deletions be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <glog/logging.h>

#include <algorithm>
#include <ostream>
#include <unordered_map>
#include <utility>

#include "column/chunk.h"
Expand All @@ -36,6 +34,18 @@

namespace starrocks::parquet {

void ColumnOffsetIndexCtx::collect_io_range(std::vector<io::SharedBufferedInputStream::IORange>* ranges,
int64_t* end_offset, bool active) {
for (size_t i = 0; i < page_selected.size(); i++) {
if (page_selected[i]) {
auto r = io::SharedBufferedInputStream::IORange(
offset_index.page_locations[i].offset, offset_index.page_locations[i].compressed_page_size, active);
ranges->emplace_back(r);
*end_offset = std::max(*end_offset, r.offset + r.size);
}
}
}

Status ColumnDictFilterContext::rewrite_conjunct_ctxs_to_predicate(StoredColumnReader* reader,
bool* is_group_filtered) {
// create dict value chunk for evaluation.
Expand Down
14 changes: 14 additions & 0 deletions be/src/formats/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ struct ParquetField;

namespace starrocks::parquet {

struct ColumnOffsetIndexCtx {
tparquet::OffsetIndex offset_index;
std::vector<bool> page_selected;
uint64_t rg_first_row;

void collect_io_range(std::vector<io::SharedBufferedInputStream::IORange>* ranges, int64_t* end_offset,
bool active);

// be compatible with PARQUET-1850
bool check_dictionary_page(int64_t data_page_offset) {
return offset_index.page_locations.size() > 0 && offset_index.page_locations[0].offset > data_page_offset;
}
};

struct ColumnReaderOptions {
std::string timezone;
bool case_sensitive = false;
Expand Down
Loading

0 comments on commit d925914

Please sign in to comment.