Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More accurate estimate on parquet row groups size #11258

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

jinyangli34
Copy link

@jinyangli34 jinyangli34 commented Oct 4, 2024

Context:
When writing data to parquet, the data could be in memory buffer, encoded as page, or in mixed state. The size of in memory data could be much larger than encoded page. The row group size could be even smaller after the final compression.
This makes it difficult to estimate the current row group size, and result in creating much smaller row-group than write.parquet.row-group-size-bytes config

Fix:
Use the previous in memory vs written ratios to make estimation on current row group

Testing:
Writing 50 columns with 1MB row group size, using columnName + i as data
(wide columns and small row group size can reduce the chance of encoding happens)

Before fix:
Block 1 compressed size: 30792
Block 2 compressed size: 30793
Block 3 compressed size: 30791
Block 4 compressed size: 30935
Block 5 compressed size: 30838
Block 6 compressed size: 30825
Block 7 compressed size: 30843
Block 8 compressed size: 30839


After fix:
Block 1 compressed size: 1007783
Block 2 compressed size: 1051313
Block 3 compressed size: 1056900
Block 4 compressed size: 1056199
Block 5 compressed size: 1059470
Block 6 compressed size: 1058124
Block 7 compressed size: 1060285
Block 8 compressed size: 1057551

@jinyangli34 jinyangli34 changed the title add unit test for parquet row groups size More accurate estimate on parquet row groups size Oct 4, 2024
@jinyangli34 jinyangli34 marked this pull request as ready for review October 4, 2024 21:02
@amogh-jahagirdar amogh-jahagirdar self-requested a review October 4, 2024 22:47
@github-actions github-actions bot added the spark label Oct 7, 2024
Comment on lines +560 to +561
Table table = createTable(6);
shouldHaveFiles(table, 6);
Copy link
Author

@jinyangli34 jinyangli34 Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, the test table has smaller number of row groups. Small change like 4->3 may cause small tail when generating taskGroups
Size of taskGroups with 4 -> 3: [197078, 188542, 189047, 15617]

Using 6 -> 3 can have better tolerance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why changes to this test are being made

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4/3 may produce groups with [1.4, 1.3, 1.3] or [1.3, 1.3, 1.3, 0.1]
The testing data generates [197078, 188542, 189047, 15617] with small tail.
Using 4/2 or 6/3 can make test more robust.

@@ -185,9 +190,17 @@ public List<Long> splitOffsets() {
return null;
}

private long estimateBufferSize() {
if (totalRowGroupSize == 0 || totalBufferSize == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also fallback to writeStore.getBufferedSize() when currentBufferSize == 0? Likely it'd be 0 since I don't see other case, but I feel that'd make it consistent on the fallbacks.

@@ -185,9 +190,17 @@ public List<Long> splitOffsets() {
return null;
}

private long estimateBufferSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename estimateBufferSize to estimateBufferedSize for naming consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we add a few comments about the estimation logic? i.e. what the ratio totalRowGroupSize / totalBufferSize means as estimating the ratio of size reduction to match closely what will be written out encoded/compressed.

@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private boolean closed;
private ParquetFileWriter writer;
private int rowGroupOrdinal;
private long currentBufferSize = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this tracks mostly what is being added in memory directly on each add, I'd suggest renaming to currentRawBufferedSize or something alike.

@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private boolean closed;
private ParquetFileWriter writer;
private int rowGroupOrdinal;
private long currentBufferSize = 0;
private long totalBufferSize = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, given this is the sum of the raw count of bytes on each add, perhaps something like totalRawBufferedSize.

private void checkSize() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
long bufferedSize = estimateBufferedSize();
double avgRecordSize = ((double) bufferedSize) / recordCount;

if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the -2 * avg records size ? Seems like that shouldn't really matter now that we are more accurately estimating our buffered size?

@@ -211,6 +228,8 @@ private void flushRowGroup(boolean finished) {
writer.startBlock(recordCount);
writeStore.flush();
pageStore.flushToFileWriter(writer);
totalRawBufferedSize += currentRawBufferedSize;
totalRowGroupSize += writeStore.getBufferedSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this part, what are we adding to totalRowGroupSize here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalRowGroupSize is the total size of written data. Every time a row group is written, we add the new written data size.

@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private boolean closed;
private ParquetFileWriter writer;
private int rowGroupOrdinal;
private long currentRawBufferedSize = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for my own clarity this is

currentRawBufferedSize = The amount of "buffer" in the currentRowGroup calculated as a diff in getBufferedSize()
toatalRawBufferedSIze = The amount of "buffer" in all RowGroups calculated as a diff in getBufferedSize()
totalRowGroupSize = The amount of "buffer" in all RowGroups calculated by just adding getBufferedSize() for each rowgroup together

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each row group data has two different sizes:

  1. in memory before written (in_memory_size)
  2. written as complete row group (written_size)

currentRawBufferedSize: "in_memory" of current unfinished row group
toatalRawBufferedSIze: sum(in_memory for rg in finished_row_groups)
totalRowGroupSize: sum(written for rg in finished_row_groups)

we use (toatalRawBufferedSIze / totalRowGroupSize) ratio to estimate size change from in-memory to written state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do some renames of the variables to match these definitions? I think it's hard (at least for me) to follow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for renaming the variables to make it clearer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about
currentRawBufferedSize -> currentRowGroupMemoryBufferSize
totalRawBufferedSize -> cumulativeMemoryBufferSize
totalRowGroupSize -> cumulativeWrittenSize

@@ -132,7 +135,9 @@ private void ensureWriterInitialized() {
@Override
public void add(T value) {
recordCount += 1;
long sizeBeforeWrite = writeStore.getBufferedSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now calling getBufferedSize for every value, this seems like a big perf difference? Can we run the write benchmarks with this on and off?

@jinyangli34
Copy link
Author

write benchmarks. result suffix:
.main -> Iceberg main branch without this change
.0 -> With Change in this PR
.2 -> 0 + call writeStore.getBufferedSize() 2 more times for each add value
.4 -> 0 + call writeStore.getBufferedSize() 4 more times for each add value

SparkParquetWritersNestedDataBenchmark:

% spark-parquet-writers-nested-data-benchmark-result.txt.main
Benchmark                                                       Mode  Cnt  Score   Error  Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter    ss    5  1.984 ± 0.249   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter      ss    5  2.143 ± 0.602   s/op

% spark-parquet-writers-nested-data-benchmark-result.txt.0
Benchmark                                                       Mode  Cnt  Score   Error  Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter    ss    5  2.139 ± 0.889   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter      ss    5  2.020 ± 0.220   s/op

% spark-parquet-writers-nested-data-benchmark-result.txt.2
Benchmark                                                       Mode  Cnt  Score   Error  Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter    ss    5  2.097 ± 0.297   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter      ss    5  2.144 ± 0.286   s/op

% spark-parquet-writers-nested-data-benchmark-result.txt.4
Benchmark                                                       Mode  Cnt  Score   Error  Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter    ss    5  2.188 ± 0.555   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter      ss    5  2.121 ± 0.351   s/op

SparkParquetWritersFlatDataBenchmark:

% spark-parquet-writers-flat-data-benchmark-result.txt.main
Benchmark                                                     Mode  Cnt  Score   Error  Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter    ss    5  3.181 ± 0.201   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter      ss    5  3.377 ± 0.898   s/op

% spark-parquet-writers-flat-data-benchmark-result.txt.0
Benchmark                                                     Mode  Cnt  Score   Error  Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter    ss    5  3.398 ± 0.652   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter      ss    5  3.494 ± 0.242   s/op

% spark-parquet-writers-flat-data-benchmark-result.txt.2
Benchmark                                                     Mode  Cnt  Score   Error  Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter    ss    5  3.586 ± 0.754   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter      ss    5  3.393 ± 0.698   s/op

% spark-parquet-writers-flat-data-benchmark-result.txt.4
Benchmark                                                     Mode  Cnt  Score   Error  Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter    ss    5  3.561 ± 0.302   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter      ss    5  3.356 ± 0.518   s/op

@RussellSpitzer
Copy link
Member

write benchmarks. result suffix: .main -> Iceberg main branch without this change .0 -> With Change in this PR .2 -> 0 + call writeStore.getBufferedSize() 2 more times for each add value .4 -> 0 + call writeStore.getBufferedSize() 4 more times for each add value

I'm not sure what you are running in these benchmarks. I would very much worry that your experiments in .2 and .4 are not being blackholed and erased by the jvm. Could you adjust the benchmarks to test a larger size file and summarized your findings?

@jinyangli34
Copy link
Author

Run benchmark again, increased NUM_RECORDS from 1M to 5M

Tested 4 groups:
main: main branch without change in this PR
PR: this PR
PR+2: two more getBufferedSize calls per add value

   @Override
   public void add(T value) {
     recordCount += 1;
+    long size1 = writeStore.getBufferedSize();
+    long size2 = writeStore.getBufferedSize();
+    if (size1 != size2) {
+      throw new RuntimeException("Buffered size changed after adding a record");
+    }
     long sizeBeforeWrite = writeStore.getBufferedSize();
     model.write(0, value);
     this.currentRawBufferedSize += writeStore.getBufferedSize() - sizeBeforeWrite;

PR+4: four more getBufferedSize calls per add value

   @Override
   public void add(T value) {
     recordCount += 1;
+    long size1 = writeStore.getBufferedSize();
+    long size2 = writeStore.getBufferedSize();
+    long size3 = writeStore.getBufferedSize();
+    long size4 = writeStore.getBufferedSize();
+    if (size1 != size2 || size3 != size4) {
+      throw new RuntimeException("Buffered size changed after adding a record");
+    }
     long sizeBeforeWrite = writeStore.getBufferedSize();
     model.write(0, value);
     this.currentRawBufferedSize += writeStore.getBufferedSize() - sizeBeforeWrite;

Avg numbers:

Flat Benchmark Avg	Main	PR	PR+2	PR+4		
writeUsingIcebergWriter	15.773	15.976	16.672	17.133		
writeUsingSparkWriter	16.056	15.826	15.830	15.891

Nested Benchmark Avg	Main	PR	PR+2	PR+4		
writeUsingIcebergWriter	9.683	9.775	9.978	10.199		
writeUsingSparkWriter	10.156	9.676	9.698	9.683

Comparing this PR vs main branch, after this change:
Iceberg Writer is 1.3% slower for flat data and 0.95% slower for nested data
Spark Writer is 1.4% faster for flat data and 4.7% faster for nested data

iceberg-pr-11258-perf-test.csv

@github-actions github-actions bot added the core label Oct 9, 2024
@jinyangli34
Copy link
Author

write benchmarks. result suffix: .main -> Iceberg main branch without this change .0 -> With Change in this PR .2 -> 0 + call writeStore.getBufferedSize() 2 more times for each add value .4 -> 0 + call writeStore.getBufferedSize() 4 more times for each add value

I'm not sure what you are running in these benchmarks. I would very much worry that your experiments in .2 and .4 are not being blackholed and erased by the jvm. Could you adjust the benchmarks to test a larger size file and summarized your findings?

@RussellSpitzer Please check the previous comment with new test results

@RussellSpitzer
Copy link
Member

@RussellSpitzer
Copy link
Member

I'm very suspicious of the Spark writer getting faster, do we have any explanation of why that is? Shouldn't it be using the same underlying code so any differences should be seen in both writers?

@RussellSpitzer
Copy link
Member

@nastra Can you please take a look at this as well? I want someone else who is familiar with the write path to double check

try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) {
ParquetMetadata footer = reader.getFooter();
for (int i = 1; i < footer.getBlocks().size() - 1; i++) {
assertThat(footer.getBlocks().get(i).getCompressedSize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct to look at the compressed size here instead of looking at getTotalByteSize()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If parquet row group size is referring to on disk size, then it should be getCompressedSize().
in this test, blocks[i] compressed size is: 1,007,783, total size is: 41,462,625


try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) {
ParquetMetadata footer = reader.getFooter();
for (int i = 1; i < footer.getBlocks().size() - 1; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (int i = 1; i < footer.getBlocks().size() - 1; i++) {
for (int i = 0; i < footer.getBlocks().size(); i++) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test skips first row group. The first row group is expected to be not accurate in size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment so that this is clear for a reader why the first one is skipped?

@nastra
Copy link
Contributor

nastra commented Oct 22, 2024

This makes it difficult to estimate the current row group size, and result in creating much smaller row-group than write.parquet.row-group-size-bytes config

@jinyangli34 is this because your assumption is that write.parquet.row-group-size-bytes refers to the compressed row group size?

@jinyangli34
Copy link
Author

This makes it difficult to estimate the current row group size, and result in creating much smaller row-group than write.parquet.row-group-size-bytes config

@jinyangli34 is this because your assumption is that write.parquet.row-group-size-bytes refers to the compressed row group size?

@nastra yes, see discussion here: https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1727984247713889

and from Parquet docs, it maps to HDFS block size, so should be on disk size.

Row Group Size

Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Nov 23, 2024
@nastra nastra added not-stale and removed stale labels Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants