Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #39608

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

binmahone
Copy link

@binmahone binmahone commented Jan 15, 2024

Rationale for this change

This is #38867 for main branch

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@@ -22,6 +22,7 @@
#include <utility>
#include <vector>

#include "page_index.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required? Could we use forward declaration instead?

@@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};

// Represent a range to read. The range is inclusive on both ends.
struct IntervalRange {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to move all row range stuff to a separate parquet/arrow/row_range.h

public:
RowRanges() = default;
virtual ~RowRanges() = default;
virtual size_t RowCount() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual size_t RowCount() const = 0;
/// \brief Total number of rows in the row ranges.
virtual size_t num_rows() const = 0;

Trivial getter functions like this should use snake case. And we need add docstring to user-faced public api.

Same for similar APIs below.

RowRanges() = default;
virtual ~RowRanges() = default;
virtual size_t RowCount() const = 0;
virtual int64_t LastRow() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual int64_t LastRow() const = 0;
virtual int64_t last_row() const = 0;

For completeness, should we also provide first_row() ?

virtual ~RowRanges() = default;
virtual size_t RowCount() const = 0;
virtual int64_t LastRow() const = 0;
virtual bool IsValid() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need IsValid()? Is it possible to prohibit constructing invalid row ranges from the constructor?

@@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};

// Represent a range to read. The range is inclusive on both ends.
struct IntervalRange {
static IntervalRange Intersection(const IntervalRange& left,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference is to simply define it as below

struct IntervalRange {
  int64_t start;
  int64_t end;
};

Then move all operations to a separate IntervalRangeUtil class. Users do not care about these operations.

// Represent a set of ranges to read. The ranges are sorted and non-overlapping.
class RowRanges {
public:
RowRanges() = default;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the default ctor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we need some utility function to make it easy for users to create row ranges in the common case.


};

class IntervalRanges : public RowRanges {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding a separate row_range_internal.h to hold this class and its friends?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, IntervalRanges is not very "internal". Clients need to initialize their own IntervalRanges with classes like IntervalRange to pass into the API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients are expected to pass RowRanges (not IntervalRanges) and we should support API like below to facilitate creating RowRanges:

std::unique_ptr<RowRanges> RowRanges::Make(const std::vector<IntervalRange>& ranges);

@@ -424,6 +567,10 @@ class PARQUET_EXPORT RecordReader {
/// \brief True if reading dense for nullable columns.
bool read_dense_for_nullable() const { return read_dense_for_nullable_; }

void reset_current_rg_processed_records() { current_rg_processed_records_ = 0; }

void set_record_skipper(const std::shared_ptr<RecordSkipper>& skipper) { skipper_ = skipper; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void set_record_skipper(const std::shared_ptr<RecordSkipper>& skipper) { skipper_ = skipper; }
void set_record_skipper(std::shared_ptr<RecordSkipper> skipper) { skipper_ = std::move(skipper); }

namespace internal {

// A RecordSkipper is used to skip uncessary rows within each pages.
class PARQUET_EXPORT RecordSkipper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can use forward declaration here and move it to the cpp file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried this, will cause " invalid application of 'sizeof' to an incomplete type" (https://zhuanlan.zhihu.com/p/321947743) .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to apply method III by moving the definition of ~RecordReader() into column_reader.cc

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Jan 18, 2024
Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. I was too busy this week.

@@ -162,6 +162,7 @@ set(PARQUET_SRCS
arrow/writer.cc
bloom_filter.cc
bloom_filter_reader.cc
row_range.cc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sort it in alphabetical order.

namespace internal {

// A RecordSkipper is used to skip uncessary rows within each pages.
class PARQUET_EXPORT RecordSkipper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to apply method III by moving the definition of ~RecordReader() into column_reader.cc

Comment on lines +21 to +22
#pragma once
#include <variant>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#pragma once
#include <variant>
#pragma once
#include <variant>

We need to leave a blank line here.


namespace parquet {

// Represent a range to read. The range is inclusive on both ends.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Represent a range to read. The range is inclusive on both ends.
// Represent an interval row range, which is inclusive on both ends.

}

// inclusive
int64_t start = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean [-1,-1] is an invalid range? It looks a little bit weird to define an invalid range by default.

What about marking an invalid range simply by checking if start >= end? Or we can define a special invalid range like constexpr IntervalRange kInvalidIntervalRange = {-1, -1}; and do not allow creating any other invalid range via the constructor.


AdjustRanges(skip_pages, orig_row_ranges, row_ranges_);
range_iter_ = row_ranges_->NewIterator();
current_range_variant = range_iter_->NextRange();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
current_range_variant = range_iter_->NextRange();
current_range_ = range_iter_->NextRange();

Comment on lines +2363 to +2364
const auto ret = current_range.end - current_rg_processed + 1;
return ret;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const auto ret = current_range.end - current_rg_processed + 1;
return ret;
return current_range.end - current_rg_processed + 1;

@@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}

// This is a internal API owned by FileReaderImpl, not exposed in FileReader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move it under anonymous namespace.

}
// We'll assign a RowRanges for each RG, even if it's not required to return any rows
std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg =
rows_to_return.SplitByRowRange(rows_per_rg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too early to split the RowRanges into row groups? We can probably do this lazily for each row group. For example, we can start with row group 0 and test if it falls into the range. If true, compute its overlapping range and move all remaining ranges for the next round (row group 1, 2, etc.)

In this way, we can make GetFieldReaders much simpler and less confusing.

const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
std::unique_ptr<ColumnReaderImpl>* out) {
Status GetFieldReader(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function signature now looks strange to me since it contains conflicting parameters const std::vector<int>& row_groups and const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg.

What about splitting file-based row ranges lazily? Then we can re-define this as

  // RowGroups can be either std::vector<int> or RowRanges
  template <typename RowGroups>
  Status GetFieldReader(int i,
                        const std::shared_ptr<std::unordered_set<int>>& included_leaves,
                        const RowGroups& row_groups,
                        std::unique_ptr<ColumnReaderImpl>* out);

Or if you still want to split file-based row ranges eagerly, we can do this:

  // RowGroups can be either std::vector<int> or std::map<int, RowRanges>
  template <typename RowGroups>
  Status GetFieldReader(int i,
                        const std::shared_ptr<std::unordered_set<int>>& included_leaves,
                        const RowGroups& row_groups,
                        std::unique_ptr<ColumnReaderImpl>* out);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can limit the scope of refactering work to overload SomeRowGroupsFactory below at line 221:

ctx->iterator_factory = SomeRowGroupsFactory(row_groups);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][Parquet] support passing a RowRange to RecordBatchReader
2 participants