-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More accurate estimate on parquet row groups size #11258
base: main
Are you sure you want to change the base?
Conversation
Table table = createTable(6); | ||
shouldHaveFiles(table, 6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change, the test table has smaller number of row groups. Small change like 4->3 may cause small tail when generating taskGroups
Size of taskGroups with 4 -> 3: [197078, 188542, 189047, 15617]
Using 6 -> 3 can have better tolerance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why changes to this test are being made
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4/3 may produce groups with [1.4, 1.3, 1.3] or [1.3, 1.3, 1.3, 0.1]
The testing data generates [197078, 188542, 189047, 15617] with small tail.
Using 4/2 or 6/3 can make test more robust.
@@ -185,9 +190,17 @@ public List<Long> splitOffsets() { | |||
return null; | |||
} | |||
|
|||
private long estimateBufferSize() { | |||
if (totalRowGroupSize == 0 || totalBufferSize == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also fallback to writeStore.getBufferedSize()
when currentBufferSize == 0
? Likely it'd be 0
since I don't see other case, but I feel that'd make it consistent on the fallbacks.
@@ -185,9 +190,17 @@ public List<Long> splitOffsets() { | |||
return null; | |||
} | |||
|
|||
private long estimateBufferSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename estimateBufferSize
to estimateBufferedSize
for naming consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can we add a few comments about the estimation logic? i.e. what the ratio totalRowGroupSize / totalBufferSize
means as estimating the ratio of size reduction to match closely what will be written out encoded/compressed.
@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable { | |||
private boolean closed; | |||
private ParquetFileWriter writer; | |||
private int rowGroupOrdinal; | |||
private long currentBufferSize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this tracks mostly what is being added in memory directly on each add
, I'd suggest renaming to currentRawBufferedSize
or something alike.
@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable { | |||
private boolean closed; | |||
private ParquetFileWriter writer; | |||
private int rowGroupOrdinal; | |||
private long currentBufferSize = 0; | |||
private long totalBufferSize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, given this is the sum of the raw count of bytes on each add, perhaps something like totalRawBufferedSize
.
private void checkSize() { | ||
if (recordCount >= nextCheckRecordCount) { | ||
long bufferedSize = writeStore.getBufferedSize(); | ||
long bufferedSize = estimateBufferedSize(); | ||
double avgRecordSize = ((double) bufferedSize) / recordCount; | ||
|
||
if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the -2 * avg records size ? Seems like that shouldn't really matter now that we are more accurately estimating our buffered size?
@@ -211,6 +228,8 @@ private void flushRowGroup(boolean finished) { | |||
writer.startBlock(recordCount); | |||
writeStore.flush(); | |||
pageStore.flushToFileWriter(writer); | |||
totalRawBufferedSize += currentRawBufferedSize; | |||
totalRowGroupSize += writeStore.getBufferedSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this part, what are we adding to totalRowGroupSize here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totalRowGroupSize
is the total size of written data. Every time a row group is written, we add the new written data size.
@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable { | |||
private boolean closed; | |||
private ParquetFileWriter writer; | |||
private int rowGroupOrdinal; | |||
private long currentRawBufferedSize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for my own clarity this is
currentRawBufferedSize = The amount of "buffer" in the currentRowGroup calculated as a diff in getBufferedSize()
toatalRawBufferedSIze = The amount of "buffer" in all RowGroups calculated as a diff in getBufferedSize()
totalRowGroupSize = The amount of "buffer" in all RowGroups calculated by just adding getBufferedSize() for each rowgroup together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each row group data has two different sizes:
- in memory before written (in_memory_size)
- written as complete row group (written_size)
currentRawBufferedSize: "in_memory" of current unfinished row group
toatalRawBufferedSIze: sum(in_memory for rg in finished_row_groups)
totalRowGroupSize: sum(written for rg in finished_row_groups)
we use (toatalRawBufferedSIze / totalRowGroupSize) ratio to estimate size change from in-memory to written state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do some renames of the variables to match these definitions? I think it's hard (at least for me) to follow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for renaming the variables to make it clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
currentRawBufferedSize
-> currentRowGroupMemoryBufferSize
totalRawBufferedSize
-> cumulativeMemoryBufferSize
totalRowGroupSize
-> cumulativeWrittenSize
@@ -132,7 +135,9 @@ private void ensureWriterInitialized() { | |||
@Override | |||
public void add(T value) { | |||
recordCount += 1; | |||
long sizeBeforeWrite = writeStore.getBufferedSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now calling getBufferedSize for every value, this seems like a big perf difference? Can we run the write benchmarks with this on and off?
write benchmarks. result suffix: SparkParquetWritersNestedDataBenchmark:
SparkParquetWritersFlatDataBenchmark:
|
I'm not sure what you are running in these benchmarks. I would very much worry that your experiments in .2 and .4 are not being blackholed and erased by the jvm. Could you adjust the benchmarks to test a larger size file and summarized your findings? |
Run benchmark again, increased Tested 4 groups:
PR+4: four more getBufferedSize calls per add value
Avg numbers:
Comparing this PR vs main branch, after this change: |
@RussellSpitzer Please check the previous comment with new test results |
To blackhole values you can use, https://javadoc.io/doc/org.openjdk.jmh/jmh-core/1.23/org/openjdk/jmh/infra/Blackhole.html |
I'm very suspicious of the Spark writer getting faster, do we have any explanation of why that is? Shouldn't it be using the same underlying code so any differences should be seen in both writers? |
@nastra Can you please take a look at this as well? I want someone else who is familiar with the write path to double check |
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { | ||
ParquetMetadata footer = reader.getFooter(); | ||
for (int i = 1; i < footer.getBlocks().size() - 1; i++) { | ||
assertThat(footer.getBlocks().get(i).getCompressedSize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it correct to look at the compressed size here instead of looking at getTotalByteSize()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If parquet row group size is referring to on disk size, then it should be getCompressedSize().
in this test, blocks[i] compressed size is: 1,007,783, total size is: 41,462,625
|
||
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { | ||
ParquetMetadata footer = reader.getFooter(); | ||
for (int i = 1; i < footer.getBlocks().size() - 1; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int i = 1; i < footer.getBlocks().size() - 1; i++) { | |
for (int i = 0; i < footer.getBlocks().size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test skips first row group. The first row group is expected to be not accurate in size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add a comment so that this is clear for a reader why the first one is skipped?
@jinyangli34 is this because your assumption is that |
@nastra yes, see discussion here: https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1727984247713889 and from Parquet docs, it maps to HDFS block size, so should be on disk size.
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
Context:
When writing data to parquet, the data could be in memory buffer, encoded as page, or in mixed state. The size of in memory data could be much larger than encoded page. The row group size could be even smaller after the final compression.
This makes it difficult to estimate the current row group size, and result in creating much smaller row-group than
write.parquet.row-group-size-bytes
configFix:
Use the previous in memory vs written ratios to make estimation on current row group
Testing:
Writing 50 columns with 1MB row group size, using
columnName + i
as data(wide columns and small row group size can reduce the chance of encoding happens)