Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More accurate estimate on parquet row groups size #11258

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/iceberg/TestMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce

assertThat(recordsFile).isNotNull();
// rowgroup size should be > 1
assertThat(splitCount(recordsFile)).isEqualTo(3);
assertThat(splitCount(recordsFile)).isEqualTo(2);

assertThat(metrics.recordCount()).isEqualTo(201L);
assertCounts(1, 201L, 0L, metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private boolean closed;
private ParquetFileWriter writer;
private int rowGroupOrdinal;
private long currentRawBufferedSize = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for my own clarity this is

currentRawBufferedSize = The amount of "buffer" in the currentRowGroup calculated as a diff in getBufferedSize()
toatalRawBufferedSIze = The amount of "buffer" in all RowGroups calculated as a diff in getBufferedSize()
totalRowGroupSize = The amount of "buffer" in all RowGroups calculated by just adding getBufferedSize() for each rowgroup together

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each row group data has two different sizes:

  1. in memory before written (in_memory_size)
  2. written as complete row group (written_size)

currentRawBufferedSize: "in_memory" of current unfinished row group
toatalRawBufferedSIze: sum(in_memory for rg in finished_row_groups)
totalRowGroupSize: sum(written for rg in finished_row_groups)

we use (toatalRawBufferedSIze / totalRowGroupSize) ratio to estimate size change from in-memory to written state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do some renames of the variables to match these definitions? I think it's hard (at least for me) to follow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for renaming the variables to make it clearer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about
currentRawBufferedSize -> currentRowGroupMemoryBufferSize
totalRawBufferedSize -> cumulativeMemoryBufferSize
totalRowGroupSize -> cumulativeWrittenSize

private long totalRawBufferedSize = 0;
private long totalRowGroupSize = 0;

private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
Expand Down Expand Up @@ -132,7 +135,9 @@ private void ensureWriterInitialized() {
@Override
public void add(T value) {
recordCount += 1;
long sizeBeforeWrite = writeStore.getBufferedSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now calling getBufferedSize for every value, this seems like a big perf difference? Can we run the write benchmarks with this on and off?

model.write(0, value);
this.currentRawBufferedSize += writeStore.getBufferedSize() - sizeBeforeWrite;
writeStore.endRecord();
checkSize();
}
Expand Down Expand Up @@ -167,7 +172,7 @@ public long length() {
if (!closed && recordCount > 0) {
// recordCount > 0 when there are records in the write store that have not been flushed to
// the Parquet file
length += writeStore.getBufferedSize();
length += estimateBufferedSize();
}

return length;
Expand All @@ -185,14 +190,25 @@ public List<Long> splitOffsets() {
return null;
}

/*
* Data size could reduce after written out due to encoding/compression.
* Use the ratio totalRowGroupSize / totalBufferSize to estimate the size after write out.
*/
private long estimateBufferedSize() {
if (totalRowGroupSize == 0 || totalRawBufferedSize == 0 || currentRawBufferedSize == 0) {
return writeStore.getBufferedSize();
}

return currentRawBufferedSize * totalRowGroupSize / totalRawBufferedSize;
}

private void checkSize() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
double avgRecordSize = ((double) bufferedSize) / recordCount;

if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) {
long bufferedSize = estimateBufferedSize();
if (bufferedSize > targetRowGroupSize) {
flushRowGroup(false);
} else {
double avgRecordSize = ((double) bufferedSize) / recordCount;
long remainingSpace = targetRowGroupSize - bufferedSize;
long remainingRecords = (long) (remainingSpace / avgRecordSize);
this.nextCheckRecordCount =
Expand All @@ -211,6 +227,8 @@ private void flushRowGroup(boolean finished) {
writer.startBlock(recordCount);
writeStore.flush();
pageStore.flushToFileWriter(writer);
totalRawBufferedSize += currentRawBufferedSize;
totalRowGroupSize += writeStore.getBufferedSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this part, what are we adding to totalRowGroupSize here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalRowGroupSize is the total size of written data. Every time a row group is written, we add the new written data size.

writer.endBlock();
if (!finished) {
writeStore.close();
Expand Down Expand Up @@ -245,6 +263,7 @@ private void startRowGroup() {
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, pageStore);

model.setColumnStore(writeStore);
this.currentRawBufferedSize = 0;
}

@Override
Expand Down
49 changes: 49 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -57,6 +59,7 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -219,6 +222,52 @@ public void testTwoLevelList() throws IOException {
assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary);
}

@Test
public void testParquetRowGroupSize() throws IOException {
// verify parquet row group size should be close to configured size
int recordCount = 100000;
int columnCount = 50;

List<Types.NestedField> columns =
IntStream.rangeClosed(1, columnCount)
.mapToObj(i -> optional(i, "stringCol" + i, Types.StringType.get()))
.collect(ImmutableList.toImmutableList());
Schema schema = new Schema(columns);

File file = createTempFile(temp);

List<GenericData.Record> records = Lists.newArrayListWithCapacity(recordCount);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
for (int i = 1; i <= recordCount; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
for (Types.NestedField column : columns) {
String value = column.name().repeat(10) + i;
record.put(column.name(), value);
}

records.add(record);
}

long actualSize =
write(
file,
schema,
ImmutableMap.of("write.parquet.row-group-size-bytes", "1048576"),
ParquetAvroWriter::buildWriter,
records.toArray(new GenericData.Record[] {}));

try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) {
ParquetMetadata footer = reader.getFooter();
for (int i = 1; i < footer.getBlocks().size() - 1; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (int i = 1; i < footer.getBlocks().size() - 1; i++) {
for (int i = 0; i < footer.getBlocks().size(); i++) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test skips first row group. The first row group is expected to be not accurate in size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment so that this is clear for a reader why the first one is skipped?

assertThat(footer.getBlocks().get(i).getCompressedSize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct to look at the compressed size here instead of looking at getTotalByteSize()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If parquet row group size is referring to on disk size, then it should be getCompressedSize().
in this test, blocks[i] compressed size is: 1,007,783, total size is: 41,462,625

.isBetween((long) 900 * 1024, (long) 1200 * 1024);
}

assertThat(footer.getBlocks().get(footer.getBlocks().size() - 1).getCompressedSize())
.isLessThan((long) 1200 * 1024);
}
}

private Pair<File, Long> generateFile(
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
int desiredRecordCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,8 @@ public void testBinPackCombineMixedFiles() {

@Test
public void testBinPackCombineMediumFiles() {
Table table = createTable(4);
shouldHaveFiles(table, 4);
Table table = createTable(6);
shouldHaveFiles(table, 6);
Comment on lines +560 to +561
Copy link
Author

@jinyangli34 jinyangli34 Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, the test table has smaller number of row groups. Small change like 4->3 may cause small tail when generating taskGroups
Size of taskGroups with 4 -> 3: [197078, 188542, 189047, 15617]

Using 6 -> 3 can have better tolerance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why changes to this test are being made

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4/3 may produce groups with [1.4, 1.3, 1.3] or [1.3, 1.3, 1.3, 0.1]
The testing data generates [197078, 188542, 189047, 15617] with small tail.
Using 4/2 or 6/3 can make test more robust.


List<Object[]> expectedRecords = currentData();
int targetSize = ((int) testDataSize(table) / 3);
Expand All @@ -578,7 +578,7 @@ public void testBinPackCombineMediumFiles() {

assertThat(result.rewrittenDataFilesCount())
.as("Action should delete 4 data files")
.isEqualTo(4);
.isEqualTo(6);
assertThat(result.addedDataFilesCount()).as("Action should add 3 data files").isEqualTo(3);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

Expand Down
Loading