GH-47628: [C++][Parquet] Implement basic parquet file rewriter#47775
GH-47628: [C++][Parquet] Implement basic parquet file rewriter#47775HuaHuaY wants to merge 1 commit intoapache:mainfrom
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format? or See also: |
e4de469 to
c216849
Compare
|
@pitrou @adamreeve @mapleFU Do you have any suggestions about this draft? Is there any efficient way to merge two parquet files' schema? |
mapleFU
left a comment
There was a problem hiding this comment.
Emm I'm thinking that just reuse the current code a ok way, since these logic in current impl would be a bit hacking with current interface...
wgtmac
left a comment
There was a problem hiding this comment.
I haven't reviewed all the changes yet and will progressively post my comments.
wgtmac
left a comment
There was a problem hiding this comment.
The general workflow of the rewriter looks good to me. However, I don't believe we should directly manipulate the thrift objects.
| RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, | ||
| std::shared_ptr<ArrowOutputStream> sink, | ||
| const RewriterProperties* props, | ||
| std::shared_ptr<RowGroupReader> row_group_reader, |
There was a problem hiding this comment.
Perhaps introduce a RowGroupContext to hold all row group xxx readers?
There was a problem hiding this comment.
I think it doesn't bring much benefit and requires one more step to unwrap of the wrapper class.
e037be7 to
253f281
Compare
b70917f to
439103e
Compare
439103e to
641ab8c
Compare
There was a problem hiding this comment.
Pull request overview
Draft implementation of a C++ Parquet “file rewriter” that can rewrite/concatenate/join Parquet files by copying encoded bytes and re-emitting metadata (optionally including page indexes and bloom filters), avoiding full decode/re-encode.
Changes:
- Add
ParquetFileRewriter+RewriterPropertiesand implement basic concat (horizontal) + join (vertical) rewriting. - Add metadata copying helpers (
to_thrift, new builder entrypoints, newToThriftoverloads) to support fast metadata reconstruction. - Add Arrow-based tests for roundtrip rewriting scenarios (simple, concat, join, concat+join) and wire into CMake.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/src/parquet/thrift_internal.h | Add ToThrift helpers for page-index-related structs. |
| cpp/src/parquet/properties.h | Make ReaderProperties::GetStream const; add RewriterProperties + default factory decl. |
| cpp/src/parquet/properties.cc | Update ReaderProperties::GetStream definition to const. |
| cpp/src/parquet/page_index.h | Extend PageIndexBuilder API to allow setting pre-built indexes. |
| cpp/src/parquet/page_index.cc | Implement new PageIndexBuilder setters and mixed builder/prebuilt serialization. |
| cpp/src/parquet/metadata.h | Add start_offset(), expose to_thrift(), extend RowGroupMetaDataBuilder. |
| cpp/src/parquet/metadata.cc | Implement new metadata accessors and new row-group column-chunk injection path. |
| cpp/src/parquet/file_rewriter.h | New public rewriter API (ParquetFileRewriter). |
| cpp/src/parquet/file_rewriter.cc | Core rewriting implementation (copy streams, concat/join logic, index/bloom handling). |
| cpp/src/parquet/bloom_filter_writer.h | Extend BloomFilterBuilder with InsertBloomFilter. |
| cpp/src/parquet/bloom_filter_writer.cc | Implement InsertBloomFilter. |
| cpp/src/parquet/arrow/test_util.h | Add helper to write a table into a Parquet buffer for tests. |
| cpp/src/parquet/arrow/arrow_rewriter_test.cc | Add Arrow-level rewriter roundtrip tests. |
| cpp/src/parquet/CMakeLists.txt | Add new source (file_rewriter.cc) and new test target. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (page_index_reader_ != nullptr && page_index_builder != nullptr) { | ||
| auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_); | ||
| auto offset_index = page_index_reader_->GetOffsetIndex(column_ordinal_); | ||
| if (column_index != nullptr) { | ||
| page_index_builder->SetColumnIndex(column_ordinal_, column_index); | ||
| } | ||
| if (offset_index != nullptr) { | ||
| page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, shift); | ||
| } | ||
| } | ||
|
|
||
| if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) { | ||
| auto bloom_filter = bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_); | ||
| if (bloom_filter != nullptr) { | ||
| bloom_filter_builder->InsertBloomFilter(column_ordinal_, | ||
| std::move(bloom_filter)); | ||
| } | ||
| } |
There was a problem hiding this comment.
The new rewriter code path copies page indexes (SetColumnIndex/SetOffsetIndex) and bloom filters (InsertBloomFilter) when enabled, but the added Arrow-level roundtrip tests only validate the decoded table contents. Consider extending tests to assert that rewritten files actually contain the expected page index / bloom filter structures (e.g., via ParquetFileReader::GetPageIndexReader / GetBloomFilterReader and checking non-null indexes/locations), so regressions in metadata copying/offset shifting are caught.
| std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source, | ||
| int64_t start, int64_t num_bytes); | ||
| int64_t start, int64_t num_bytes) const; | ||
|
|
There was a problem hiding this comment.
Changing ReaderProperties::GetStream to be const changes the mangled symbol and will break ABI for downstream code compiled against earlier versions of the library. If ABI compatibility matters here, consider keeping the old non-const overload (forwarding to the const implementation) instead of changing the existing signature in-place.
| // Non-const overload kept for ABI compatibility. It forwards to the const | |
| // implementation introduced in a later version. | |
| std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source, | |
| int64_t start, int64_t num_bytes) { | |
| return static_cast<const ReaderProperties*>(this)->GetStream(source, start, num_bytes); | |
| } |
cpp/src/parquet/metadata.h
Outdated
| const void* to_thrift() const; | ||
|
|
There was a problem hiding this comment.
ColumnChunkMetaData::to_thrift() exposes an untyped pointer to an internal Thrift struct as part of the public metadata API, which couples consumers to internal representation and is easy to misuse/UB (wrong cast / lifetime assumptions). Prefer an internal-only accessor, or return a typed reference/pointer to the concrete thrift type in an internal header, or provide a dedicated cloning/copy helper on the builder to avoid exposing raw thrift at all.
7be60a0 to
41248c8
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
41248c8 to
2140284
Compare
|
|
2140284 to
648c87b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
648c87b to
ec89836
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class PARQUET_EXPORT ParquetFileRewriter { | ||
| public: | ||
| struct PARQUET_EXPORT Contents { | ||
| virtual ~Contents() = default; | ||
| virtual void Close() = 0; | ||
| virtual void Rewrite() = 0; | ||
| }; | ||
|
|
||
| ParquetFileRewriter(); | ||
| ~ParquetFileRewriter(); | ||
|
|
||
| static std::unique_ptr<ParquetFileRewriter> Open( | ||
| std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, | ||
| std::shared_ptr<ArrowOutputStream> sink, | ||
| std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, | ||
| std::shared_ptr<const ::arrow::KeyValueMetadata> sink_metadata = NULLPTR, | ||
| std::shared_ptr<RewriterProperties> props = default_rewriter_properties()); | ||
|
|
||
| void Open(std::unique_ptr<Contents> contents); | ||
| void Close(); | ||
|
|
||
| void Rewrite(); |
There was a problem hiding this comment.
The ParquetFileRewriter class and its public methods (Open, Close, Rewrite) lack documentation comments. Since this is a new public API (marked with PARQUET_EXPORT), these methods should have documentation describing their purpose, parameters, return values, and any exceptions that may be thrown. This is especially important for the Open method with its complex nested vector parameters.
| @@ -485,6 +488,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { | |||
| ~RowGroupMetaDataBuilder(); | |||
|
|
|||
| ColumnChunkMetaDataBuilder* NextColumnChunk(); | |||
There was a problem hiding this comment.
The new overload of NextColumnChunk that takes ColumnChunkMetaData and a shift value lacks documentation. This method allows adding column metadata without creating a builder, which is a key feature for the rewriter's fast-copy optimization. Documentation should explain the parameters (especially shift for offset adjustment) and when to use this method versus the builder-based approach.
| ColumnChunkMetaDataBuilder* NextColumnChunk(); | |
| ColumnChunkMetaDataBuilder* NextColumnChunk(); | |
| // Add an existing column chunk metadata object to the row group without | |
| // constructing it through a ColumnChunkMetaDataBuilder. | |
| // | |
| // This overload is intended for fast-copy / rewriting scenarios where | |
| // column chunks are reused from another file and their metadata has | |
| // already been finalized. | |
| // | |
| // Parameters: | |
| // - cc_metadata: Ownership of the provided ColumnChunkMetaData is | |
| // transferred to the RowGroupMetaDataBuilder. | |
| // - shift: Byte offset delta to apply to all file-relative offsets in | |
| // cc_metadata (for example, when appending row groups at a different | |
| // position in the target file). Use 0 if no adjustment is needed. | |
| // | |
| // For column chunks produced by this writer, prefer the builder-based | |
| // NextColumnChunk() API above. |
| class PARQUET_EXPORT RewriterProperties { | ||
| public: | ||
| class Builder { | ||
| public: | ||
| Builder() | ||
| : pool_(::arrow::default_memory_pool()), | ||
| writer_properties_(default_writer_properties()), | ||
| reader_properties_(default_reader_properties()) {} | ||
|
|
||
| explicit Builder(const RewriterProperties& properties) | ||
| : pool_(properties.memory_pool()), | ||
| writer_properties_(properties.writer_properties()), | ||
| reader_properties_(properties.reader_properties()) {} | ||
|
|
||
| virtual ~Builder() = default; | ||
|
|
||
| /// Specify the memory pool for the rewriter. Default default_memory_pool. | ||
| Builder* memory_pool(MemoryPool* pool) { | ||
| pool_ = pool; | ||
| return this; | ||
| } | ||
|
|
||
| /// Set the writer properties. | ||
| Builder* writer_properties(std::shared_ptr<WriterProperties> properties) { | ||
| writer_properties_ = std::move(properties); | ||
| return this; | ||
| } | ||
|
|
||
| /// Set the reader properties. | ||
| Builder* reader_properties(ReaderProperties properties) { | ||
| reader_properties_ = std::move(properties); | ||
| return this; | ||
| } | ||
|
|
||
| /// Build the RewriterProperties with the builder parameters. | ||
| std::shared_ptr<RewriterProperties> build() { | ||
| return std::shared_ptr<RewriterProperties>(new RewriterProperties( | ||
| pool_, std::move(writer_properties_), std::move(reader_properties_))); | ||
| } | ||
|
|
||
| private: | ||
| MemoryPool* pool_; | ||
| std::shared_ptr<WriterProperties> writer_properties_; | ||
| ReaderProperties reader_properties_; | ||
| }; | ||
|
|
||
| MemoryPool* memory_pool() const { return pool_; } | ||
|
|
||
| const std::shared_ptr<WriterProperties>& writer_properties() const { | ||
| return writer_properties_; | ||
| } | ||
|
|
||
| const ReaderProperties& reader_properties() const { return reader_properties_; } | ||
|
|
||
| private: | ||
| explicit RewriterProperties(MemoryPool* pool, | ||
| std::shared_ptr<WriterProperties> writer_properties, | ||
| ReaderProperties reader_properties) | ||
| : pool_(pool), | ||
| writer_properties_(std::move(writer_properties)), | ||
| reader_properties_(std::move(reader_properties)) {} | ||
|
|
||
| MemoryPool* pool_; | ||
| std::shared_ptr<WriterProperties> writer_properties_; | ||
| ReaderProperties reader_properties_; | ||
| }; |
There was a problem hiding this comment.
The RewriterProperties class lacks documentation. As a new public API class (marked with PARQUET_EXPORT), it should have comprehensive documentation explaining its purpose, how it relates to WriterProperties and ReaderProperties, and what configuration options it provides for the rewriter.
|
|
||
| virtual void SetColumnIndex(int32_t i, | ||
| const std::shared_ptr<ColumnIndex>& column_index) = 0; | ||
|
|
There was a problem hiding this comment.
The new methods SetColumnIndex and SetOffsetIndex lack documentation comments. These are part of the public PageIndexBuilder interface and should include documentation explaining their purpose, parameters (especially the shift parameter in SetOffsetIndex), and when they should be used instead of the corresponding builder methods.
| virtual void SetColumnIndex(int32_t i, | |
| const std::shared_ptr<ColumnIndex>& column_index) = 0; | |
| /// \brief Set a fully constructed ColumnIndex for a column. | |
| /// | |
| /// This method can be used instead of GetColumnIndexBuilder() when the caller | |
| /// already has a materialized ColumnIndex instance (for example, when reusing | |
| /// or transforming an existing index) and does not need to build it through | |
| /// the ColumnIndexBuilder interface. | |
| /// | |
| /// \param i Column ordinal. | |
| /// \param column_index The ColumnIndex to associate with the given column. | |
| virtual void SetColumnIndex(int32_t i, | |
| const std::shared_ptr<ColumnIndex>& column_index) = 0; | |
| /// \brief Set a fully constructed OffsetIndex for a column. | |
| /// | |
| /// This method can be used instead of GetOffsetIndexBuilder() when the caller | |
| /// already has a materialized OffsetIndex instance. | |
| /// | |
| /// The \p shift parameter is applied to all page offsets contained in the | |
| /// provided OffsetIndex. It allows reusing an index whose offsets are relative | |
| /// to a different file position (for example, when concatenating data or | |
| /// writing the index at a different location) by shifting all stored offsets | |
| /// by a constant amount. | |
| /// | |
| /// \param i Column ordinal. | |
| /// \param offset_index The OffsetIndex to associate with the given column. | |
| /// \param shift A byte offset added to each page offset stored in \p offset_index. |
ec89836 to
4541d11
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| std::shared_ptr<ResizableBuffer> buffer = | ||
| AllocateBuffer(pool, kDefaultOutputStreamSize); | ||
| while (bytes_copied < size) { | ||
| PARQUET_ASSIGN_OR_THROW( | ||
| auto read_size, | ||
| from->Read(std::min(size - bytes_copied, buffer->size()), &buffer)); | ||
| if (read_size == 0) { | ||
| throw ParquetException("Unexpected end of stream at ", bytes_copied); | ||
| } | ||
| PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); | ||
| bytes_copied += read_size; | ||
| } |
There was a problem hiding this comment.
CopyStream allocates a 1KB buffer (kDefaultOutputStreamSize) for the non-zero-copy path. For large column chunks this will cause excessive small reads/writes and significantly degrade rewrite performance. Consider using a substantially larger chunk size (e.g., 64KB+ or an Arrow IO default) for the copy buffer.
57df653 to
a7b8576
Compare
PR #47775 Review: GH-47628: [C++][Parquet] Implement basic parquet file rewriter
SummaryThis PR adds a Critical Issues 🔴1. [BLOCKING] While Arrow targets C++20 (confirmed in Suggested fix: Replace 2. [BLOCKING]
Suggested fix: Instead of exposing 3. [BLOCKING]
Suggested fix: For the row-group-level fast copy, compute the byte range from individual column chunk offsets (min start_offset → max end offset). Consider falling back to column-by-column copy when row group level metadata is unreliable. 4. [BLOCKING] Suggested fix: Document this limitation clearly or account for it. At minimum, add a comment noting this only works for unencrypted columns. Major Suggestions 🟡5. Missing Doxygen documentation on all new public APIs Similarly,
6. if (column_builders_[i] == NULLPTR) {
total_compressed_size += row_group_->columns[i].meta_data.total_compressed_size;
continue;
}This is fragile — any future code that iterates Suggestion: Consider using 7.
8. 9. Suggested fix: Mark it 10. from->Read(std::min(size - bytes_copied, buffer->size()), &buffer)The Suggestion: Use Minor Suggestions 🟢11. [NIT] 12. [NIT] 13. [NIT] Missing test for encryption rejection. The code throws 14. [NIT] Missing test for page index and bloom filter round-trip. While AI Code Smell 🤖None confidently detected. The code has a consistent style, uses domain-appropriate terminology, and the abstractions (PagesRewriter → ColumnChunkRewriter → RowGroupRewriter → SingleFileRewriter → ConcatRewriter → JoinRewriter) mirror the Parquet file structure naturally. The TODO comments reference specific usernames/futures which is genuine. Positive Notes 👍
|
d5876fc to
ab74095
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto first_row_index = | ||
| original_offset_index_ | ||
| ? std::optional{original_offset_index_->page_locations()[page_no] | ||
| .first_row_index} | ||
| : std::nullopt; | ||
| SizeStatistics size_statistics; | ||
| size_statistics.unencoded_byte_array_data_bytes = | ||
| original_offset_index_ && | ||
| !original_offset_index_->unencoded_byte_array_data_bytes().empty() | ||
| ? std::optional{original_offset_index_ | ||
| ->unencoded_byte_array_data_bytes()[page_no]} | ||
| : std::nullopt; |
There was a problem hiding this comment.
There is no bounds checking when accessing page_locations()[page_no] and unencoded_byte_array_data_bytes()[page_no]. If the original_offset_index has fewer entries than the number of data pages (due to file corruption or malformed data), this will result in out-of-bounds access. Consider adding validation like: if (page_no >= original_offset_index_->page_locations().size()) before accessing the arrays.
| auto first_row_index = | |
| original_offset_index_ | |
| ? std::optional{original_offset_index_->page_locations()[page_no] | |
| .first_row_index} | |
| : std::nullopt; | |
| SizeStatistics size_statistics; | |
| size_statistics.unencoded_byte_array_data_bytes = | |
| original_offset_index_ && | |
| !original_offset_index_->unencoded_byte_array_data_bytes().empty() | |
| ? std::optional{original_offset_index_ | |
| ->unencoded_byte_array_data_bytes()[page_no]} | |
| : std::nullopt; | |
| std::optional<int64_t> first_row_index; | |
| if (original_offset_index_) { | |
| const auto& locations = original_offset_index_->page_locations(); | |
| if (page_no < locations.size()) { | |
| first_row_index = locations[page_no].first_row_index; | |
| } | |
| } | |
| SizeStatistics size_statistics; | |
| if (original_offset_index_) { | |
| const auto& unencoded_bytes = | |
| original_offset_index_->unencoded_byte_array_data_bytes(); | |
| if (!unencoded_bytes.empty() && page_no < unencoded_bytes.size()) { | |
| size_statistics.unencoded_byte_array_data_bytes = | |
| std::optional{unencoded_bytes[page_no]}; | |
| } | |
| } |
| original_offset_index_ | ||
| ? std::optional{original_offset_index_->page_locations()[page_no] | ||
| .first_row_index} | ||
| : std::nullopt; | ||
| SizeStatistics size_statistics; | ||
| size_statistics.unencoded_byte_array_data_bytes = | ||
| original_offset_index_ && | ||
| !original_offset_index_->unencoded_byte_array_data_bytes().empty() |
There was a problem hiding this comment.
There is no bounds checking when accessing page_locations()[page_no] and unencoded_byte_array_data_bytes()[page_no]. If the original_offset_index has fewer entries than the number of data pages (due to file corruption or malformed data), this will result in out-of-bounds access. Consider adding validation like: if (page_no >= original_offset_index_->page_locations().size()) before accessing the arrays.
| original_offset_index_ | |
| ? std::optional{original_offset_index_->page_locations()[page_no] | |
| .first_row_index} | |
| : std::nullopt; | |
| SizeStatistics size_statistics; | |
| size_statistics.unencoded_byte_array_data_bytes = | |
| original_offset_index_ && | |
| !original_offset_index_->unencoded_byte_array_data_bytes().empty() | |
| original_offset_index_ && | |
| page_no < original_offset_index_->page_locations().size() | |
| ? std::optional{original_offset_index_->page_locations()[page_no] | |
| .first_row_index} | |
| : std::nullopt; | |
| SizeStatistics size_statistics; | |
| size_statistics.unencoded_byte_array_data_bytes = | |
| original_offset_index_ && | |
| !original_offset_index_->unencoded_byte_array_data_bytes().empty() && | |
| page_no < original_offset_index_->unencoded_byte_array_data_bytes().size() |
|
I think this PR has implemented basic functions. Please take a look @pitrou @adamreeve |
5ba79f3 to
8c5bda0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| static std::unique_ptr<ParquetFileRewriter> Open( | ||
| std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, | ||
| std::shared_ptr<ArrowOutputStream> sink, | ||
| std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, | ||
| std::shared_ptr<const ::arrow::KeyValueMetadata> sink_metadata = NULLPTR, | ||
| std::shared_ptr<RewriterProperties> props = default_rewriter_properties()); |
There was a problem hiding this comment.
The Open method parameters would benefit from documentation explaining the 2D vector structure. The outer vector represents groups of files to join (horizontally combine columns), and the inner vector represents files to concatenate (vertically stack row groups). Consider adding parameter documentation to clarify this structure and usage.
8c5bda0 to
06fb05c
Compare
Rationale for this change
Allow to rewrite parquet files in binary data formats instead of reading, decoding all values and writing them.
What changes are included in this PR?
ParquetFileRewriterandRewriterProperties.to_thriftandSetXXXmethods to help me copy the metadata.CopyStreammethods to callmemcpybetweenArrowInputStreamandArrowOutputStream.RowGroupMetaDataBuilder::NextColumnChunk(std::unique_ptr<ColumnChunkMetaData> cc_metadata, int64_t shift)which allows to add column metadata without creatingColumnChunkMetaDataBuilder.Are these changes tested?
Yes
Are there any user-facing changes?
ReaderProperties::GetStreamis changed to a const method. Only the signature has been changed. Its original implementation allows it to be declared as a const method.