-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #39608
base: main
Are you sure you want to change the base?
Conversation
cpp/src/parquet/column_reader.h
Outdated
@@ -22,6 +22,7 @@ | |||
#include <utility> | |||
#include <vector> | |||
|
|||
#include "page_index.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required? Could we use forward declaration instead?
cpp/src/parquet/column_reader.h
Outdated
@@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader { | |||
int32_t* dict_len) = 0; | |||
}; | |||
|
|||
// Represent a range to read. The range is inclusive on both ends. | |||
struct IntervalRange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to move all row range stuff to a separate parquet/arrow/row_range.h
cpp/src/parquet/column_reader.h
Outdated
public: | ||
RowRanges() = default; | ||
virtual ~RowRanges() = default; | ||
virtual size_t RowCount() const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
virtual size_t RowCount() const = 0; | |
/// \brief Total number of rows in the row ranges. | |
virtual size_t num_rows() const = 0; |
Trivial getter functions like this should use snake case. And we need add docstring to user-faced public api.
Same for similar APIs below.
cpp/src/parquet/column_reader.h
Outdated
RowRanges() = default; | ||
virtual ~RowRanges() = default; | ||
virtual size_t RowCount() const = 0; | ||
virtual int64_t LastRow() const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
virtual int64_t LastRow() const = 0; | |
virtual int64_t last_row() const = 0; |
For completeness, should we also provide first_row()
?
cpp/src/parquet/column_reader.h
Outdated
virtual ~RowRanges() = default; | ||
virtual size_t RowCount() const = 0; | ||
virtual int64_t LastRow() const = 0; | ||
virtual bool IsValid() const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need IsValid()
? Is it possible to prohibit constructing invalid row ranges from the constructor?
cpp/src/parquet/column_reader.h
Outdated
@@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader { | |||
int32_t* dict_len) = 0; | |||
}; | |||
|
|||
// Represent a range to read. The range is inclusive on both ends. | |||
struct IntervalRange { | |||
static IntervalRange Intersection(const IntervalRange& left, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My personal preference is to simply define it as below
struct IntervalRange {
int64_t start;
int64_t end;
};
Then move all operations to a separate IntervalRangeUtil class. Users do not care about these operations.
cpp/src/parquet/column_reader.h
Outdated
// Represent a set of ranges to read. The ranges are sorted and non-overlapping. | ||
class RowRanges { | ||
public: | ||
RowRanges() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the default ctor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we need some utility function to make it easy for users to create row ranges in the common case.
cpp/src/parquet/column_reader.h
Outdated
|
||
}; | ||
|
||
class IntervalRanges : public RowRanges { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a separate row_range_internal.h
to hold this class and its friends?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, IntervalRanges is not very "internal". Clients need to initialize their own IntervalRanges with classes like IntervalRange to pass into the API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients are expected to pass RowRanges (not IntervalRanges) and we should support API like below to facilitate creating RowRanges:
std::unique_ptr<RowRanges> RowRanges::Make(const std::vector<IntervalRange>& ranges);
cpp/src/parquet/column_reader.h
Outdated
@@ -424,6 +567,10 @@ class PARQUET_EXPORT RecordReader { | |||
/// \brief True if reading dense for nullable columns. | |||
bool read_dense_for_nullable() const { return read_dense_for_nullable_; } | |||
|
|||
void reset_current_rg_processed_records() { current_rg_processed_records_ = 0; } | |||
|
|||
void set_record_skipper(const std::shared_ptr<RecordSkipper>& skipper) { skipper_ = skipper; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void set_record_skipper(const std::shared_ptr<RecordSkipper>& skipper) { skipper_ = skipper; } | |
void set_record_skipper(std::shared_ptr<RecordSkipper> skipper) { skipper_ = std::move(skipper); } |
namespace internal { | ||
|
||
// A RecordSkipper is used to skip uncessary rows within each pages. | ||
class PARQUET_EXPORT RecordSkipper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we can use forward declaration here and move it to the cpp file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tried this, will cause " invalid application of 'sizeof' to an incomplete type" (https://zhuanlan.zhihu.com/p/321947743) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to apply method III by moving the definition of ~RecordReader()
into column_reader.cc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. I was too busy this week.
@@ -162,6 +162,7 @@ set(PARQUET_SRCS | |||
arrow/writer.cc | |||
bloom_filter.cc | |||
bloom_filter_reader.cc | |||
row_range.cc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please sort it in alphabetical order.
namespace internal { | ||
|
||
// A RecordSkipper is used to skip uncessary rows within each pages. | ||
class PARQUET_EXPORT RecordSkipper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to apply method III by moving the definition of ~RecordReader()
into column_reader.cc
#pragma once | ||
#include <variant> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#pragma once | |
#include <variant> | |
#pragma once | |
#include <variant> |
We need to leave a blank line here.
|
||
namespace parquet { | ||
|
||
// Represent a range to read. The range is inclusive on both ends. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Represent a range to read. The range is inclusive on both ends. | |
// Represent an interval row range, which is inclusive on both ends. |
} | ||
|
||
// inclusive | ||
int64_t start = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean [-1,-1] is an invalid range? It looks a little bit weird to define an invalid range by default.
What about marking an invalid range simply by checking if start >= end
? Or we can define a special invalid range like constexpr IntervalRange kInvalidIntervalRange = {-1, -1};
and do not allow creating any other invalid range via the constructor.
|
||
AdjustRanges(skip_pages, orig_row_ranges, row_ranges_); | ||
range_iter_ = row_ranges_->NewIterator(); | ||
current_range_variant = range_iter_->NextRange(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current_range_variant = range_iter_->NextRange(); | |
current_range_ = range_iter_->NextRange(); |
const auto ret = current_range.end - current_rg_processed + 1; | ||
return ret; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto ret = current_range.end - current_rg_processed + 1; | |
return ret; | |
return current_range.end - current_rg_processed + 1; |
@@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader { | |||
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); | |||
} | |||
|
|||
// This is a internal API owned by FileReaderImpl, not exposed in FileReader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move it under anonymous namespace.
} | ||
// We'll assign a RowRanges for each RG, even if it's not required to return any rows | ||
std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg = | ||
rows_to_return.SplitByRowRange(rows_per_rg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it too early to split the RowRanges into row groups? We can probably do this lazily for each row group. For example, we can start with row group 0 and test if it falls into the range. If true, compute its overlapping range and move all remaining ranges for the next round (row group 1, 2, etc.)
In this way, we can make GetFieldReaders
much simpler and less confusing.
const std::shared_ptr<std::unordered_set<int>>& included_leaves, | ||
const std::vector<int>& row_groups, | ||
std::unique_ptr<ColumnReaderImpl>* out) { | ||
Status GetFieldReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function signature now looks strange to me since it contains conflicting parameters const std::vector<int>& row_groups
and const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg
.
What about splitting file-based row ranges lazily? Then we can re-define this as
// RowGroups can be either std::vector<int> or RowRanges
template <typename RowGroups>
Status GetFieldReader(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const RowGroups& row_groups,
std::unique_ptr<ColumnReaderImpl>* out);
Or if you still want to split file-based row ranges eagerly, we can do this:
// RowGroups can be either std::vector<int> or std::map<int, RowRanges>
template <typename RowGroups>
Status GetFieldReader(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const RowGroups& row_groups,
std::unique_ptr<ColumnReaderImpl>* out);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we can limit the scope of refactering work to overload SomeRowGroupsFactory below at line 221:
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
Rationale for this change
This is #38867 for main branch
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?