-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #39608
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
#include "parquet/arrow/reader.h" | ||
|
||
#include "parquet/page_index.h" | ||
|
||
#include <algorithm> | ||
#include <cstring> | ||
#include <memory> | ||
|
@@ -199,10 +201,11 @@ class FileReaderImpl : public FileReader { | |
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); | ||
} | ||
|
||
Status GetFieldReader(int i, | ||
const std::shared_ptr<std::unordered_set<int>>& included_leaves, | ||
const std::vector<int>& row_groups, | ||
std::unique_ptr<ColumnReaderImpl>* out) { | ||
Status GetFieldReader( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function signature now looks strange to me since it contains conflicting parameters What about splitting file-based row ranges lazily? Then we can re-define this as
Or if you still want to split file-based row ranges eagerly, we can do this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we can limit the scope of refactering work to overload SomeRowGroupsFactory below at line 221:
|
||
int i, const std::shared_ptr<std::unordered_set<int>>& included_leaves, | ||
const std::vector<int>& row_groups, | ||
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg, | ||
std::unique_ptr<ColumnReaderImpl>* out) { | ||
// Should be covered by GetRecordBatchReader checks but | ||
// manifest_.schema_fields is a separate variable so be extra careful. | ||
if (ARROW_PREDICT_FALSE(i < 0 || | ||
|
@@ -218,13 +221,16 @@ class FileReaderImpl : public FileReader { | |
ctx->iterator_factory = SomeRowGroupsFactory(row_groups); | ||
ctx->filter_leaves = true; | ||
ctx->included_leaves = included_leaves; | ||
ctx->row_ranges_per_rg = | ||
row_ranges_per_rg; // copy the shared pointer to extend its lifecycle | ||
return GetReader(manifest_.schema_fields[i], ctx, out); | ||
} | ||
|
||
Status GetFieldReaders(const std::vector<int>& column_indices, | ||
const std::vector<int>& row_groups, | ||
std::vector<std::shared_ptr<ColumnReaderImpl>>* out, | ||
std::shared_ptr<::arrow::Schema>* out_schema) { | ||
Status GetFieldReaders( | ||
const std::vector<int>& column_indices, const std::vector<int>& row_groups, | ||
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg, | ||
std::vector<std::shared_ptr<ColumnReaderImpl>>* out, | ||
std::shared_ptr<::arrow::Schema>* out_schema) { | ||
// We only need to read schema fields which have columns indicated | ||
// in the indices vector | ||
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices, | ||
|
@@ -236,8 +242,8 @@ class FileReaderImpl : public FileReader { | |
::arrow::FieldVector out_fields(field_indices.size()); | ||
for (size_t i = 0; i < out->size(); ++i) { | ||
std::unique_ptr<ColumnReaderImpl> reader; | ||
RETURN_NOT_OK( | ||
GetFieldReader(field_indices[i], included_leaves, row_groups, &reader)); | ||
RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_groups, | ||
row_ranges_per_rg, &reader)); | ||
|
||
out_fields[i] = reader->field(); | ||
out->at(i) = std::move(reader); | ||
|
@@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader { | |
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); | ||
} | ||
|
||
// This is a internal API owned by FileReaderImpl, not exposed in FileReader | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move it under anonymous namespace. |
||
Status GetRecordBatchReaderWithRowRanges( | ||
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, | ||
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg, | ||
std::unique_ptr<RecordBatchReader>* out); | ||
|
||
Status GetRecordBatchReader(const RowRanges& rows_to_return, | ||
const std::vector<int>& column_indices, | ||
std::unique_ptr<RecordBatchReader>* out) override { | ||
const auto metadata = reader_->metadata(); | ||
// check if the row ranges are within the row group boundaries | ||
if (rows_to_return.num_rows() != 0 && | ||
rows_to_return.last_row() >= metadata->num_rows()) { | ||
return Status::Invalid("The provided row range " + rows_to_return.ToString() + | ||
" exceeds the number of rows in the file: " + | ||
std::to_string(metadata->num_rows())); | ||
} | ||
if (rows_to_return.num_rows() == 0) { | ||
return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out); | ||
} | ||
|
||
std::vector<int64_t> rows_per_rg; | ||
for (int i = 0; i < metadata->num_row_groups(); i++) { | ||
rows_per_rg.push_back(metadata->RowGroup(i)->num_rows()); | ||
} | ||
// We'll assign a RowRanges for each RG, even if it's not required to return any rows | ||
std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg = | ||
rows_to_return.SplitByRowRange(rows_per_rg); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it too early to split the RowRanges into row groups? We can probably do this lazily for each row group. For example, we can start with row group 0 and test if it falls into the range. If true, compute its overlapping range and move all remaining ranges for the next round (row group 1, 2, etc.) In this way, we can make |
||
std::vector<int> row_group_indices; | ||
for (int i = 0; i < metadata->num_row_groups(); i++) { | ||
if (row_ranges_per_rg.at(i)->num_rows() > 0) row_group_indices.push_back(i); | ||
} | ||
|
||
return GetRecordBatchReaderWithRowRanges( | ||
row_group_indices, column_indices, | ||
std::make_shared<std::vector<std::unique_ptr<RowRanges>>>( | ||
std::move(row_ranges_per_rg)), | ||
out); | ||
} | ||
|
||
Status GetRecordBatchReader(const std::vector<int>& row_group_indices, | ||
const std::vector<int>& column_indices, | ||
std::unique_ptr<RecordBatchReader>* out) override; | ||
std::unique_ptr<RecordBatchReader>* out) override { | ||
return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, {}, out); | ||
} | ||
|
||
Status GetRecordBatchReader(const std::vector<int>& row_group_indices, | ||
std::unique_ptr<RecordBatchReader>* out) override { | ||
return GetRecordBatchReader(row_group_indices, | ||
Iota(reader_->metadata()->num_columns()), out); | ||
return GetRecordBatchReaderWithRowRanges( | ||
row_group_indices, Iota(reader_->metadata()->num_columns()), {}, out); | ||
} | ||
|
||
Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override { | ||
return GetRecordBatchReader(Iota(num_row_groups()), | ||
Iota(reader_->metadata()->num_columns()), out); | ||
return GetRecordBatchReaderWithRowRanges( | ||
Iota(num_row_groups()), Iota(reader_->metadata()->num_columns()), {}, out); | ||
} | ||
|
||
::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>> | ||
|
@@ -440,6 +488,64 @@ class RowGroupReaderImpl : public RowGroupReader { | |
// ---------------------------------------------------------------------- | ||
// Column reader implementations | ||
|
||
// This class is used to skip decompressing & decoding unnecessary pages by comparing | ||
// user-specified row_ranges and page_ranges from metadata. Only support IntervalRange | ||
// case for now. | ||
class RowRangesPageFilter { | ||
public: | ||
RowRangesPageFilter(const RowRanges& row_ranges, | ||
const std::shared_ptr<RowRanges>& page_ranges) | ||
: row_ranges_(row_ranges), page_ranges_(page_ranges) {} | ||
|
||
// To avoid error "std::function target must be copy-constructible", we must define copy | ||
// constructor | ||
RowRangesPageFilter(const RowRangesPageFilter& other) | ||
: row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {} | ||
|
||
bool operator()(const DataPageStats& stats) { | ||
if (!initted) { | ||
row_ranges_itr_ = row_ranges_.NewIterator(); | ||
page_ranges_itr_ = page_ranges_->NewIterator(); | ||
|
||
current_row_range_ = row_ranges_itr_->NextRange(); | ||
|
||
if (current_row_range_.index() != 0) { | ||
throw ParquetException( | ||
"RowRangesPageFilter expects first NextRange() to be a IntervalRange"); | ||
} | ||
initted = true; | ||
} | ||
|
||
current_page_range_ = page_ranges_itr_->NextRange(); | ||
if (current_page_range_.index() != 0) { | ||
throw ParquetException( | ||
"RowRangesPageFilter expects first NextRange() to be a IntervalRange"); | ||
} | ||
|
||
while (current_row_range_.index() == 0 && | ||
IntervalRangeUtils::IsAfter(std::get<IntervalRange>(current_page_range_), | ||
std::get<IntervalRange>(current_row_range_))) { | ||
current_row_range_ = row_ranges_itr_->NextRange(); | ||
} | ||
|
||
if (current_row_range_.index() != 0) { | ||
return true; | ||
} | ||
|
||
return IntervalRangeUtils::IsBefore(std::get<IntervalRange>(current_page_range_), | ||
std::get<IntervalRange>(current_row_range_)); | ||
} | ||
|
||
private: | ||
const RowRanges& row_ranges_; | ||
const std::shared_ptr<RowRanges> page_ranges_; | ||
std::unique_ptr<RowRanges::Iterator> row_ranges_itr_ = NULLPTR; | ||
std::unique_ptr<RowRanges::Iterator> page_ranges_itr_ = NULLPTR; | ||
std::variant<IntervalRange, BitmapRange, End> current_row_range_ = End(); | ||
std::variant<IntervalRange, BitmapRange, End> current_page_range_ = End(); | ||
bool initted = false; | ||
}; | ||
|
||
// Leaf reader is for primitive arrays and primitive children of nested arrays | ||
class LeafReader : public ColumnReaderImpl { | ||
public: | ||
|
@@ -501,8 +607,79 @@ class LeafReader : public ColumnReaderImpl { | |
|
||
private: | ||
std::shared_ptr<ChunkedArray> out_; | ||
|
||
void checkAndGetPageRanges(const RowRanges& row_ranges, | ||
std::shared_ptr<IntervalRanges>& page_ranges) const { | ||
// check offset exists | ||
const auto rg_pg_index_reader = | ||
ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group()); | ||
|
||
if (!rg_pg_index_reader) { | ||
throw ParquetException( | ||
"Attempting to read with Ranges but Page Index is not found for Row " | ||
"Group: " + | ||
std::to_string(input_->current_row_group())); | ||
} | ||
const auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index()); | ||
|
||
if (!offset_index) { | ||
throw ParquetException( | ||
"Attempting to read with Ranges but Offset index is not found for " | ||
"column: " + | ||
field_->name()); | ||
} | ||
|
||
const auto page_locations = offset_index->page_locations(); | ||
page_ranges = std::make_shared<IntervalRanges>(); | ||
for (size_t i = 0; i < page_locations.size() - 1; i++) { | ||
page_ranges->Add( | ||
{page_locations[i].first_row_index, page_locations[i + 1].first_row_index - 1}); | ||
} | ||
if (page_locations.size() >= 1) { | ||
page_ranges->Add( | ||
{page_locations[page_locations.size() - 1].first_row_index, | ||
ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() - | ||
1}); | ||
} | ||
|
||
if (row_ranges.num_rows() > 0) { | ||
if (row_ranges.last_row() > page_ranges->last_row()) { | ||
throw ParquetException( | ||
"The provided row range " + row_ranges.ToString() + " exceeds last page :" + | ||
IntervalRangeUtils::ToString(page_ranges->GetRanges().back())); | ||
} | ||
} | ||
} | ||
|
||
void NextRowGroup() { | ||
std::unique_ptr<PageReader> page_reader = input_->NextChunk(); | ||
|
||
/// using page index to reduce cost | ||
if (page_reader != nullptr && ctx_->row_ranges_per_rg) { | ||
// reset skipper | ||
record_reader_->reset_record_skipper(); | ||
|
||
const auto& row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; | ||
// if specific row range is provided for this rg | ||
if (row_ranges->num_rows() != 0) { | ||
// Use IntervalRanges to represent pages | ||
std::shared_ptr<IntervalRanges> page_ranges; | ||
checkAndGetPageRanges(*row_ranges, page_ranges); | ||
|
||
// part 1, skip decompressing & decoding unnecessary pages | ||
page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges)); | ||
|
||
// part 2, skip unnecessary rows in necessary pages | ||
record_reader_->set_record_skipper( | ||
std::make_unique<parquet::internal::RecordSkipper>(*page_ranges, | ||
*row_ranges)); | ||
} else { | ||
NextRowGroup(); | ||
return; | ||
} | ||
} | ||
|
||
record_reader_->reset_current_rg_processed_records(); | ||
record_reader_->SetPageReader(std::move(page_reader)); | ||
} | ||
|
||
|
@@ -971,9 +1148,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& | |
|
||
} // namespace | ||
|
||
Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, | ||
const std::vector<int>& column_indices, | ||
std::unique_ptr<RecordBatchReader>* out) { | ||
Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( | ||
const std::vector<int>& row_groups, const std::vector<int>& column_indices, | ||
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg, | ||
std::unique_ptr<RecordBatchReader>* out) { | ||
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); | ||
|
||
if (reader_properties_.pre_buffer()) { | ||
|
@@ -986,7 +1164,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, | |
|
||
std::vector<std::shared_ptr<ColumnReaderImpl>> readers; | ||
std::shared_ptr<::arrow::Schema> batch_schema; | ||
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema)); | ||
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, row_ranges_per_rg, &readers, | ||
&batch_schema)); | ||
|
||
if (readers.empty()) { | ||
// Just generate all batches right now; they're cheap since they have no columns. | ||
|
@@ -1241,7 +1420,8 @@ Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups( | |
// in a sync context too so use `this` over `self` | ||
std::vector<std::shared_ptr<ColumnReaderImpl>> readers; | ||
std::shared_ptr<::arrow::Schema> result_schema; | ||
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); | ||
RETURN_NOT_OK( | ||
GetFieldReaders(column_indices, row_groups, {}, &readers, &result_schema)); | ||
// OptionalParallelForAsync requires an executor | ||
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -23,6 +23,7 @@ | |||||||||||
#include <memory> | ||||||||||||
#include <vector> | ||||||||||||
|
||||||||||||
#include "parquet/column_reader.h" | ||||||||||||
#include "parquet/file_reader.h" | ||||||||||||
#include "parquet/platform.h" | ||||||||||||
#include "parquet/properties.h" | ||||||||||||
|
@@ -180,6 +181,19 @@ class PARQUET_EXPORT FileReader { | |||||||||||
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, | ||||||||||||
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; | ||||||||||||
|
||||||||||||
/// \brief Return a RecordBatchReader of row groups selected from | ||||||||||||
/// rows_to_return, whose columns are selected by column_indices. | ||||||||||||
/// | ||||||||||||
/// Notice that rows_to_return is file based, it not only decides which row groups to | ||||||||||||
/// read, but also which rows to read in each row group. | ||||||||||||
/// | ||||||||||||
/// | ||||||||||||
Comment on lines
+188
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
/// \returns error Status if either rows_to_return or column_indices | ||||||||||||
/// contains an invalid index | ||||||||||||
virtual ::arrow::Status GetRecordBatchReader( | ||||||||||||
const RowRanges& rows_to_return, const std::vector<int>& column_indices, | ||||||||||||
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; | ||||||||||||
|
||||||||||||
/// \brief Return a RecordBatchReader of row groups selected from | ||||||||||||
/// row_group_indices, whose columns are selected by column_indices. | ||||||||||||
/// | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please sort it in alphabetical order.