Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32723: [C++][Parquet] Add option to use LARGE* variants of binary types #35825

Open
wants to merge 69 commits into
base: main
Choose a base branch
from

Conversation

arthurpassos
Copy link

@arthurpassos arthurpassos commented May 30, 2023

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions
Copy link

⚠️ GitHub issue #32723 has been automatically assigned in GitHub to PR creator.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels May 30, 2023
@kou kou changed the title GH-32723: [C++] Add option to use LARGE* variants of binary types GH-32723: [C++][Parquet] Add option to use LARGE* variants of binary types May 30, 2023
Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skeleton is ok here, but I think too much duplicated code is introduced. Would you mind using template to simplify the code?

cpp/src/parquet/arrow/reader.cc Outdated Show resolved Hide resolved
cpp/src/parquet/column_reader.cc Outdated Show resolved Hide resolved
@arthurpassos
Copy link
Author

The skeleton is ok here, but I think too much duplicated code is introduced. Would you mind using template to simplify the code?

Sure, I'll first add some tests and then look into this. If you have any suggestions on what code you would like to be templated, pls let me know

@arthurpassos
Copy link
Author

Just added a test that depends on a parquet test file, pr for the file: apache/parquet-testing#38

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update but... sorry, I'm going to be negative again. I think it is still a problem that public APIs are exposing hacks or options that are only for internal use (also, the entire use case for this PR is rather niche).

I would rather if a cleaner approach is chosen (as already suggested some months ago). But if not, I would request that the new publicly-exposed APIs are clearly dissuasive for the user (except the new ArrowReaderProperties settings, of course). Details below.

struct IsByteArrayType<ByteArrayType> : std::true_type {};

template <>
struct IsByteArrayType<LargeByteArrayType> : std::true_type {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite verbose. You can define a simple constexpr function instead:

template <typename T>
constexpr bool IsByteArrayType() {
  return std::is_same_v<T, ByteArrayType> || std::is_same_v<T, LargeByteArrayType>;
}

cpp/src/parquet/encoding.h Show resolved Hide resolved
@@ -1342,6 +1353,23 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compatibility) {

using TestStringParquetIO = TestParquetIO<::arrow::StringType>;

#if defined(_WIN64) || defined(__LP64__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this condition. Which platforms is it excluding and why?

Large binary data is supposed to work on every platform, so there should be no reason to skip some platforms here.

@@ -1369,6 +1397,7 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {

using TestLargeBinaryParquetIO = TestParquetIO<::arrow::LargeBinaryType>;

#if defined(_WIN64) || defined(__LP64__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, it does not seem right that you are restricting tests that used to work on every platform (and that have no obvious reason to fail on some platforms).

@@ -3862,6 +3905,19 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) {
TryReadDataFile(path, ::arrow::StatusCode::IOError);
}

#if defined(ARROW_WITH_BROTLI) && defined(__LP64__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand what __LP64__ is doing here. If you really want to single out 64-bit platforms, you could instead do something like:

  if (sizeof(void*) < 8) {
    GTEST_SKIP() << "Test only runs on 64-bit platforms as it allocates more than 2GB RAM";
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see that this test takes 18 seconds in debug mode. This seems a bit excessive :-/

@@ -321,7 +321,8 @@ class PARQUET_EXPORT RecordReader {
static std::shared_ptr<RecordReader> Make(
const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
bool read_dictionary = false, bool read_dense_for_nullable = false);
bool read_dictionary = false, bool read_dense_for_nullable = false,
bool use_large_binary_variants = false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you added an argument, you have to document it in the docstring above.

Besides, it makes me uneasy that an Arrow-specific argument is making its way here.

In any case, use_large_binary_variants is not easy to understand for the user, as Parquet doesn't have any "large" binary types. It could have "arrow" in the name, or stress that this is a purely internal flag (are users expected to pass it?).

* LargeBinaryType which use int64_t as the offset type, we define LargeByteArrayType
* below to indicate parquet reader/writer to use those large variants from arrow.
* */
struct LargeByteArrayType : public ByteArrayType {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again it makes me uneasy that we are adding a fake Parquet type at the toplevel just to deal with some details of Arrow reading.

Minimally I would request that this is clearly flagged as internal and not a real type:

namespace internal {
  // DO NOT USE THIS in third-party code.
  //
  // This is a type marker passed internally when reading Parquet data
  // to Arrow, if the user requested to read binary data as large binary
  // (with `ArrowReaderProperties::use_large_binary_variants`).
  //
  // Also, this might be removed if we find a better way of implementing
  // that feature.
  struct FakeLargeByteArrayType : public ByteArrayType {};
}


template <typename BAT>
struct ArrowBinaryHelperBase {
explicit ArrowBinaryHelperBase(typename EncodingTraits<BAT>::Accumulator* out) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTR, this will probably conflict with the changes in the DeltaByteArray encoder PR, which is far more desirable than this PR. So I would suggest to postpone this PR even if we deem it ready. @wgtmac

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

Comment on lines +163 to +164
/// \brief Internal helper class for decoding BYTE_ARRAY data where we can
/// overflow the capacity of a single arrow::BinaryArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this entirely obsolete? This PR should actually enable us to write using Accumulator = ::arrow::LargeBinaryBuilder here?

@@ -437,14 +456,16 @@ std::unique_ptr<typename EncodingTraits<DType>::Encoder> MakeTypedEncoder(
PARQUET_EXPORT
std::unique_ptr<Decoder> MakeDecoder(
Type::type type_num, Encoding::type encoding, const ColumnDescriptor* descr = NULLPTR,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
bool use_large_binary_variants = false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here again I think the naming should stress the peculariaties of this new option. I'm not sure we want users to start relying on this in third-party code, do we?

@arthurpassos
Copy link
Author

Thanks for the update but... sorry, I'm going to be negative again. I think it is still a problem that public APIs are exposing hacks or options that are only for internal use (also, the entire use case for this PR is rather niche).

I would rather if a cleaner approach is chosen (as already suggested some months ago). But if not, I would request that the new publicly-exposed APIs are clearly dissuasive for the user (except the new ArrowReaderProperties settings, of course). Details below.

I assume this is the approach you refer to as already suggested & cleaner. If that's so, can you elaborate on that? I assume I would have to define a LargeAccumulator type for each type_trait, that includes: boolean, int32, int64 and so on. I can't trick the compiler by doing using LargeAccumulator = Accumulator; because that would resolve to the same signature and not a proper overload.

@arthurpassos
Copy link
Author

@wgtmac @mapleFU @pitrou Since the issue is in arrow layer and not in parquet layer, do you think it would be possible to read such file using the parquet lower level API offered by arrow? Do you have any examples?

I am not sure what to do at this point

@mapleFU
Copy link
Member

mapleFU commented Aug 15, 2023

@arthurpassos What kind of api are you just using? Since arrow has dataset, but it can be read using raw parquet::arrow api, I just want to make sure the api you're currently using

@arthurpassos
Copy link
Author

I am one of the contributors of ClickHouse, a column oriented database. We rely on arrow to read parquet files.

Code has changed since I last worked on it, but I can see a combination of the following classes/ methods arrow::ipc::RecordBatchFileReader::Open, arrow::RecordBatchReader::Next, arrow::RecordBatchReader::ReadRecordBatch and arrow::Table::FromRecordBatches. That depends on whether it is set to read with stream or non-stream reader.

You can find the full code here: https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp#L31

@arthurpassos
Copy link
Author

@mapleFU I see you reacted with a thumbs up. I assume you mean that's the correct API? Unfortunately this runs into the issue this PR tries to address, could you guide me on how to use the lower level API to go around this problem?

@mapleFU
Copy link
Member

mapleFU commented Aug 15, 2023

I think you change is reasonable, but I guess it would take sometime to merge it. So instead I guess:

  • If the string is greater than INT32_MAX, maybe dict read is not allowed before you patch being merged. Maybe we can cast to dictionary as a workaround
  • If the sum of string is greater than INT32_MAX, we can try to (1) shrink the batchSize in ClickHouse (2) in arrow, allow early return if we touch the limit?

@pitrou @wgtmac @arthurpassos What do you think of this?

@pitrou
Copy link
Member

pitrou commented Aug 15, 2023

I certainly think that if you hit a 2GB column chunk limit when reading Parquet data, you're probably using a way too large batch size. Is there a use case where it makes sense?

@wgtmac
Copy link
Member

wgtmac commented Aug 15, 2023

I don't think we have any low-level API other than parquet::arrow::FileReader to read parquet files into arrow::RecordBatch. CMIW, those examples you mentioned above seems are reading from Arrow IPC files. @arthurpassos

@arthurpassos
Copy link
Author

I don't think we have any low-level API other than parquet::arrow::FileReader to read parquet files into arrow::RecordBatch. CMIW, those examples you mentioned above seems are reading from Arrow IPC files. @arthurpassos

The way it works now is: parquet format -> arrow format -> clickhouse format. I was thinking we could remove the arrow part by directly converting from parquet format to clickhouse format with the lower level APIs, just not sure it's possible. Hence the question

@pitrou
Copy link
Member

pitrou commented Aug 15, 2023

If you're not using the Arrow format internally, then sure, it's possible. You can take a look at the TypedColumnReader interface.

@arthurpassos
Copy link
Author

If you're not using the Arrow format internally, then sure, it's possible. You can take a look at the TypedColumnReader interface.

We are using it, but I am exploring other options

@arthurpassos
Copy link
Author

I certainly think that if you hit a 2GB column chunk limit when reading Parquet data, you're probably using a way too large batch size. Is there a use case where it makes sense?

Regardless of the use case, shouldn't arrow support it simply because parquet supports it?

@pitrou
Copy link
Member

pitrou commented Aug 15, 2023

I certainly think that if you hit a 2GB column chunk limit when reading Parquet data, you're probably using a way too large batch size. Is there a use case where it makes sense?

Regardless of the use case, shouldn't arrow support it simply because parquet supports it?

I'm not sure arguing about ideals is useful :-) While adding the feature would definitely be reasonable, it's also not important enough that we should accept warts in the API in its name, IMHO.

We are using it, but I am exploring other options

Out of curiosity, would you care to explain why? The main impediment to TypedColumnReader AFAICT is that it's quite low-level. Are there other reasons?

@arthurpassos
Copy link
Author

Out of curiosity, would you care to explain why? The main impediment to TypedColumnReader AFAICT is that it's quite low-level. Are there other reasons?

I meant that TypedColumnReader is one of the "other options". The main option would be to continue using arrow layer, since our code relies on that. But well, the issue still persists.

@arthurpassos
Copy link
Author

I'm not sure arguing about ideals is useful :-) While adding the feature would definitely be reasonable, it's also not important enough that we should accept warts in the API in its name, IMHO.

@pitrou What do you think would be the optimal solution for this issue?

@mapleFU
Copy link
Member

mapleFU commented Aug 16, 2023

      auto estimatedRowSize = dataSource_->estimatedRowSize();
      readBatchSize_ =
          estimatedRowSize == connector::DataSource::kUnknownRowSize
          ? outputBatchRows()
          : outputBatchRows(estimatedRowSize);

An config to adjust batch size might helps

@arthurpassos
Copy link
Author

      auto estimatedRowSize = dataSource_->estimatedRowSize();
      readBatchSize_ =
          estimatedRowSize == connector::DataSource::kUnknownRowSize
          ? outputBatchRows()
          : outputBatchRows(estimatedRowSize);

An config to adjust batch size might helps

I think batch_size is being set here: https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp#L101. It defaults to 8192.

@mapleFU
Copy link
Member

mapleFU commented Aug 16, 2023

Yes ClickHouse use default size 8192, but personally I think it's better to adjust batchSize by storage schema when reading from file.

@slobodan-ilic
Copy link
Contributor

@arthurpassos Hi Arthur. Is there a roadmap for merging this PR? I think it might be connected to an issue we're experiencing in our pipeline, which I've documented in #38513. I've created a separate issue because the conditions are a little bit different than in the one that was automatically assigned to this PR (#32723).

@arthurpassos
Copy link
Author

@arthurpassos Hi Arthur. Is there a roadmap for merging this PR? I think it might be connected to an issue we're experiencing in our pipeline, which I've documented in #38513. I've created a separate issue because the conditions are a little bit different than in the one that was automatically assigned to this PR (#32723).

Tbh, there is not a roadmap. If I understand correctly, this patch is unwanted by core maintainers as is and the alternative approach is not even guaranteed to work.

@felipecrv
Copy link
Contributor

This, if completed, would fix #39682

@mapleFU
Copy link
Member

mapleFU commented Apr 9, 2024

Recently I've revisit this part of code. Maybe we can have a thought on this since single string wouldn't greater than 2GB, maybe the accumulator could still be StringBuilder/BinaryBuilder, limit by 2GB. And if user uses LargeBinary, ::parquet::arrow:: would merging them to LargeBinary/LargeString/Dictionary. Would that API ok?

@felipecrv @pitrou

@pitrou
Copy link
Member

pitrou commented Apr 9, 2024

@mapleFU Could you make a more precise proposal so that we can understand a bit better? What would the API be like, concretely?

@mapleFU
Copy link
Member

mapleFU commented Apr 9, 2024

Will create a separate issue for that

@mapleFU
Copy link
Member

mapleFU commented Apr 9, 2024

Create an issue for that: #41104

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][Parquet] Support nested data conversions for chunked array
7 participants