Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #39608

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ set(PARQUET_SRCS
arrow/writer.cc
bloom_filter.cc
bloom_filter_reader.cc
row_range.cc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sort it in alphabetical order.

column_reader.cc
column_scanner.cc
column_writer.cc
Expand Down Expand Up @@ -366,6 +367,8 @@ add_parquet_test(reader-test
level_conversion_test.cc
column_scanner_test.cc
reader_test.cc
range_reader_test.cc
row_range_test.cc
stream_reader_test.cc
test_util.cc)

Expand Down
220 changes: 200 additions & 20 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "parquet/arrow/reader.h"

#include "parquet/page_index.h"

#include <algorithm>
#include <cstring>
#include <memory>
Expand Down Expand Up @@ -199,10 +201,11 @@ class FileReaderImpl : public FileReader {
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
}

Status GetFieldReader(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
std::unique_ptr<ColumnReaderImpl>* out) {
Status GetFieldReader(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function signature now looks strange to me since it contains conflicting parameters const std::vector<int>& row_groups and const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg.

What about splitting file-based row ranges lazily? Then we can re-define this as

  // RowGroups can be either std::vector<int> or RowRanges
  template <typename RowGroups>
  Status GetFieldReader(int i,
                        const std::shared_ptr<std::unordered_set<int>>& included_leaves,
                        const RowGroups& row_groups,
                        std::unique_ptr<ColumnReaderImpl>* out);

Or if you still want to split file-based row ranges eagerly, we can do this:

  // RowGroups can be either std::vector<int> or std::map<int, RowRanges>
  template <typename RowGroups>
  Status GetFieldReader(int i,
                        const std::shared_ptr<std::unordered_set<int>>& included_leaves,
                        const RowGroups& row_groups,
                        std::unique_ptr<ColumnReaderImpl>* out);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can limit the scope of refactering work to overload SomeRowGroupsFactory below at line 221:

ctx->iterator_factory = SomeRowGroupsFactory(row_groups);

int i, const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::unique_ptr<ColumnReaderImpl>* out) {
// Should be covered by GetRecordBatchReader checks but
// manifest_.schema_fields is a separate variable so be extra careful.
if (ARROW_PREDICT_FALSE(i < 0 ||
Expand All @@ -218,13 +221,16 @@ class FileReaderImpl : public FileReader {
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
ctx->filter_leaves = true;
ctx->included_leaves = included_leaves;
ctx->row_ranges_per_rg =
row_ranges_per_rg; // copy the shared pointer to extend its lifecycle
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Status GetFieldReaders(const std::vector<int>& column_indices,
const std::vector<int>& row_groups,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
Status GetFieldReaders(
const std::vector<int>& column_indices, const std::vector<int>& row_groups,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
Expand All @@ -236,8 +242,8 @@ class FileReaderImpl : public FileReader {
::arrow::FieldVector out_fields(field_indices.size());
for (size_t i = 0; i < out->size(); ++i) {
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(
GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));
RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_groups,
row_ranges_per_rg, &reader));

out_fields[i] = reader->field();
out->at(i) = std::move(reader);
Expand Down Expand Up @@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}

// This is a internal API owned by FileReaderImpl, not exposed in FileReader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move it under anonymous namespace.

Status GetRecordBatchReaderWithRowRanges(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::unique_ptr<RecordBatchReader>* out);

Status GetRecordBatchReader(const RowRanges& rows_to_return,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override {
const auto metadata = reader_->metadata();
// check if the row ranges are within the row group boundaries
if (rows_to_return.num_rows() != 0 &&
rows_to_return.last_row() >= metadata->num_rows()) {
return Status::Invalid("The provided row range " + rows_to_return.ToString() +
" exceeds the number of rows in the file: " +
std::to_string(metadata->num_rows()));
}
if (rows_to_return.num_rows() == 0) {
return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out);
}

std::vector<int64_t> rows_per_rg;
for (int i = 0; i < metadata->num_row_groups(); i++) {
rows_per_rg.push_back(metadata->RowGroup(i)->num_rows());
}
// We'll assign a RowRanges for each RG, even if it's not required to return any rows
std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg =
rows_to_return.SplitByRowRange(rows_per_rg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too early to split the RowRanges into row groups? We can probably do this lazily for each row group. For example, we can start with row group 0 and test if it falls into the range. If true, compute its overlapping range and move all remaining ranges for the next round (row group 1, 2, etc.)

In this way, we can make GetFieldReaders much simpler and less confusing.

std::vector<int> row_group_indices;
for (int i = 0; i < metadata->num_row_groups(); i++) {
if (row_ranges_per_rg.at(i)->num_rows() > 0) row_group_indices.push_back(i);
}

return GetRecordBatchReaderWithRowRanges(
row_group_indices, column_indices,
std::make_shared<std::vector<std::unique_ptr<RowRanges>>>(
std::move(row_ranges_per_rg)),
out);
}

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override;
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, {}, out);
}

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(row_group_indices,
Iota(reader_->metadata()->num_columns()), out);
return GetRecordBatchReaderWithRowRanges(
row_group_indices, Iota(reader_->metadata()->num_columns()), {}, out);
}

Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(Iota(num_row_groups()),
Iota(reader_->metadata()->num_columns()), out);
return GetRecordBatchReaderWithRowRanges(
Iota(num_row_groups()), Iota(reader_->metadata()->num_columns()), {}, out);
}

::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
Expand Down Expand Up @@ -440,6 +488,64 @@ class RowGroupReaderImpl : public RowGroupReader {
// ----------------------------------------------------------------------
// Column reader implementations

// This class is used to skip decompressing & decoding unnecessary pages by comparing
// user-specified row_ranges and page_ranges from metadata. Only support IntervalRange
// case for now.
class RowRangesPageFilter {
public:
RowRangesPageFilter(const RowRanges& row_ranges,
const std::shared_ptr<RowRanges>& page_ranges)
: row_ranges_(row_ranges), page_ranges_(page_ranges) {}

// To avoid error "std::function target must be copy-constructible", we must define copy
// constructor
RowRangesPageFilter(const RowRangesPageFilter& other)
: row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {}

bool operator()(const DataPageStats& stats) {
if (!initted) {
row_ranges_itr_ = row_ranges_.NewIterator();
page_ranges_itr_ = page_ranges_->NewIterator();

current_row_range_ = row_ranges_itr_->NextRange();

if (current_row_range_.index() != 0) {
throw ParquetException(
"RowRangesPageFilter expects first NextRange() to be a IntervalRange");
}
initted = true;
}

current_page_range_ = page_ranges_itr_->NextRange();
if (current_page_range_.index() != 0) {
throw ParquetException(
"RowRangesPageFilter expects first NextRange() to be a IntervalRange");
}

while (current_row_range_.index() == 0 &&
IntervalRangeUtils::IsAfter(std::get<IntervalRange>(current_page_range_),
std::get<IntervalRange>(current_row_range_))) {
current_row_range_ = row_ranges_itr_->NextRange();
}

if (current_row_range_.index() != 0) {
return true;
}

return IntervalRangeUtils::IsBefore(std::get<IntervalRange>(current_page_range_),
std::get<IntervalRange>(current_row_range_));
}

private:
const RowRanges& row_ranges_;
const std::shared_ptr<RowRanges> page_ranges_;
std::unique_ptr<RowRanges::Iterator> row_ranges_itr_ = NULLPTR;
std::unique_ptr<RowRanges::Iterator> page_ranges_itr_ = NULLPTR;
std::variant<IntervalRange, BitmapRange, End> current_row_range_ = End();
std::variant<IntervalRange, BitmapRange, End> current_page_range_ = End();
bool initted = false;
};

// Leaf reader is for primitive arrays and primitive children of nested arrays
class LeafReader : public ColumnReaderImpl {
public:
Expand Down Expand Up @@ -501,8 +607,79 @@ class LeafReader : public ColumnReaderImpl {

private:
std::shared_ptr<ChunkedArray> out_;

void checkAndGetPageRanges(const RowRanges& row_ranges,
std::shared_ptr<IntervalRanges>& page_ranges) const {
// check offset exists
const auto rg_pg_index_reader =
ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group());

if (!rg_pg_index_reader) {
throw ParquetException(
"Attempting to read with Ranges but Page Index is not found for Row "
"Group: " +
std::to_string(input_->current_row_group()));
}
const auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index());

if (!offset_index) {
throw ParquetException(
"Attempting to read with Ranges but Offset index is not found for "
"column: " +
field_->name());
}

const auto page_locations = offset_index->page_locations();
page_ranges = std::make_shared<IntervalRanges>();
for (size_t i = 0; i < page_locations.size() - 1; i++) {
page_ranges->Add(
{page_locations[i].first_row_index, page_locations[i + 1].first_row_index - 1});
}
if (page_locations.size() >= 1) {
page_ranges->Add(
{page_locations[page_locations.size() - 1].first_row_index,
ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() -
1});
}

if (row_ranges.num_rows() > 0) {
if (row_ranges.last_row() > page_ranges->last_row()) {
throw ParquetException(
"The provided row range " + row_ranges.ToString() + " exceeds last page :" +
IntervalRangeUtils::ToString(page_ranges->GetRanges().back()));
}
}
}

void NextRowGroup() {
std::unique_ptr<PageReader> page_reader = input_->NextChunk();

/// using page index to reduce cost
if (page_reader != nullptr && ctx_->row_ranges_per_rg) {
// reset skipper
record_reader_->reset_record_skipper();

const auto& row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()];
// if specific row range is provided for this rg
if (row_ranges->num_rows() != 0) {
// Use IntervalRanges to represent pages
std::shared_ptr<IntervalRanges> page_ranges;
checkAndGetPageRanges(*row_ranges, page_ranges);

// part 1, skip decompressing & decoding unnecessary pages
page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges));

// part 2, skip unnecessary rows in necessary pages
record_reader_->set_record_skipper(
std::make_unique<parquet::internal::RecordSkipper>(*page_ranges,
*row_ranges));
} else {
NextRowGroup();
return;
}
}

record_reader_->reset_current_rg_processed_records();
record_reader_->SetPageReader(std::move(page_reader));
}

Expand Down Expand Up @@ -971,9 +1148,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&

} // namespace

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
Status FileReaderImpl::GetRecordBatchReaderWithRowRanges(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::unique_ptr<RecordBatchReader>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

if (reader_properties_.pre_buffer()) {
Expand All @@ -986,7 +1164,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,

std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> batch_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, row_ranges_per_rg, &readers,
&batch_schema));

if (readers.empty()) {
// Just generate all batches right now; they're cheap since they have no columns.
Expand Down Expand Up @@ -1241,7 +1420,8 @@ Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
// in a sync context too so use `this` over `self`
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
RETURN_NOT_OK(
GetFieldReaders(column_indices, row_groups, {}, &readers, &result_schema));
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();

Expand Down
14 changes: 14 additions & 0 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <vector>

#include "parquet/column_reader.h"
#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -180,6 +181,19 @@ class PARQUET_EXPORT FileReader {
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;

/// \brief Return a RecordBatchReader of row groups selected from
/// rows_to_return, whose columns are selected by column_indices.
///
/// Notice that rows_to_return is file based, it not only decides which row groups to
/// read, but also which rows to read in each row group.
///
///
Comment on lines +188 to +190
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// read, but also which rows to read in each row group.
///
///
/// read, but also which rows to read in each row group.
///

/// \returns error Status if either rows_to_return or column_indices
/// contains an invalid index
virtual ::arrow::Status GetRecordBatchReader(
const RowRanges& rows_to_return, const std::vector<int>& column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;

/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
///
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class FileColumnIterator {
}

auto row_group_reader = reader_->RowGroup(row_groups_.front());
current_rg_ = row_groups_.front();
row_groups_.pop_front();
return row_group_reader->GetColumnPageReader(column_index_);
}
Expand All @@ -88,11 +89,14 @@ class FileColumnIterator {

int column_index() const { return column_index_; }

int current_row_group() const { return current_rg_; }

protected:
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
std::deque<int> row_groups_;
int current_rg_ = 0;
};

using FileColumnIteratorFactory =
Expand All @@ -109,6 +113,7 @@ struct ReaderContext {
FileColumnIteratorFactory iterator_factory;
bool filter_leaves;
std::shared_ptr<std::unordered_set<int>> included_leaves;
std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>> row_ranges_per_rg;

bool IncludesLeaf(int leaf_index) const {
if (this->filter_leaves) {
Expand Down
Loading
Loading