-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More accurate estimate on parquet row groups size #11258
base: main
Are you sure you want to change the base?
Changes from all commits
07f6b97
5854ccb
c83198e
5e64668
742eaac
07fe927
98ecfac
a6e9ef3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable { | |
private boolean closed; | ||
private ParquetFileWriter writer; | ||
private int rowGroupOrdinal; | ||
private long currentRawBufferedSize = 0; | ||
private long totalRawBufferedSize = 0; | ||
private long totalRowGroupSize = 0; | ||
|
||
private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; | ||
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; | ||
|
@@ -132,7 +135,9 @@ private void ensureWriterInitialized() { | |
@Override | ||
public void add(T value) { | ||
recordCount += 1; | ||
long sizeBeforeWrite = writeStore.getBufferedSize(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are now calling getBufferedSize for every value, this seems like a big perf difference? Can we run the write benchmarks with this on and off? |
||
model.write(0, value); | ||
this.currentRawBufferedSize += writeStore.getBufferedSize() - sizeBeforeWrite; | ||
writeStore.endRecord(); | ||
checkSize(); | ||
} | ||
|
@@ -167,7 +172,7 @@ public long length() { | |
if (!closed && recordCount > 0) { | ||
// recordCount > 0 when there are records in the write store that have not been flushed to | ||
// the Parquet file | ||
length += writeStore.getBufferedSize(); | ||
length += estimateBufferedSize(); | ||
} | ||
|
||
return length; | ||
|
@@ -185,14 +190,25 @@ public List<Long> splitOffsets() { | |
return null; | ||
} | ||
|
||
/* | ||
* Data size could reduce after written out due to encoding/compression. | ||
* Use the ratio totalRowGroupSize / totalBufferSize to estimate the size after write out. | ||
*/ | ||
private long estimateBufferedSize() { | ||
if (totalRowGroupSize == 0 || totalRawBufferedSize == 0 || currentRawBufferedSize == 0) { | ||
return writeStore.getBufferedSize(); | ||
} | ||
|
||
return currentRawBufferedSize * totalRowGroupSize / totalRawBufferedSize; | ||
} | ||
|
||
private void checkSize() { | ||
if (recordCount >= nextCheckRecordCount) { | ||
long bufferedSize = writeStore.getBufferedSize(); | ||
double avgRecordSize = ((double) bufferedSize) / recordCount; | ||
|
||
if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) { | ||
long bufferedSize = estimateBufferedSize(); | ||
if (bufferedSize > targetRowGroupSize) { | ||
flushRowGroup(false); | ||
} else { | ||
double avgRecordSize = ((double) bufferedSize) / recordCount; | ||
long remainingSpace = targetRowGroupSize - bufferedSize; | ||
long remainingRecords = (long) (remainingSpace / avgRecordSize); | ||
this.nextCheckRecordCount = | ||
|
@@ -211,6 +227,8 @@ private void flushRowGroup(boolean finished) { | |
writer.startBlock(recordCount); | ||
writeStore.flush(); | ||
pageStore.flushToFileWriter(writer); | ||
totalRawBufferedSize += currentRawBufferedSize; | ||
totalRowGroupSize += writeStore.getBufferedSize(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand this part, what are we adding to totalRowGroupSize here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
writer.endBlock(); | ||
if (!finished) { | ||
writeStore.close(); | ||
|
@@ -245,6 +263,7 @@ private void startRowGroup() { | |
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, pageStore); | ||
|
||
model.setColumnStore(writeStore); | ||
this.currentRawBufferedSize = 0; | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -36,6 +36,7 @@ | |||||
import java.util.Collections; | ||||||
import java.util.List; | ||||||
import java.util.function.Function; | ||||||
import java.util.stream.IntStream; | ||||||
import org.apache.avro.generic.GenericData; | ||||||
import org.apache.avro.generic.GenericRecord; | ||||||
import org.apache.avro.generic.GenericRecordBuilder; | ||||||
|
@@ -46,6 +47,7 @@ | |||||
import org.apache.iceberg.avro.AvroSchemaUtil; | ||||||
import org.apache.iceberg.io.InputFile; | ||||||
import org.apache.iceberg.relocated.com.google.common.base.Strings; | ||||||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||||||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||||||
import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||||||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||||||
|
@@ -57,6 +59,7 @@ | |||||
import org.apache.parquet.hadoop.ParquetFileReader; | ||||||
import org.apache.parquet.hadoop.ParquetWriter; | ||||||
import org.apache.parquet.hadoop.metadata.BlockMetaData; | ||||||
import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||||||
import org.apache.parquet.schema.MessageType; | ||||||
import org.junit.jupiter.api.Test; | ||||||
import org.junit.jupiter.api.io.TempDir; | ||||||
|
@@ -219,6 +222,52 @@ public void testTwoLevelList() throws IOException { | |||||
assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary); | ||||||
} | ||||||
|
||||||
@Test | ||||||
public void testParquetRowGroupSize() throws IOException { | ||||||
// verify parquet row group size should be close to configured size | ||||||
int recordCount = 100000; | ||||||
int columnCount = 50; | ||||||
|
||||||
List<Types.NestedField> columns = | ||||||
IntStream.rangeClosed(1, columnCount) | ||||||
.mapToObj(i -> optional(i, "stringCol" + i, Types.StringType.get())) | ||||||
.collect(ImmutableList.toImmutableList()); | ||||||
Schema schema = new Schema(columns); | ||||||
|
||||||
File file = createTempFile(temp); | ||||||
|
||||||
List<GenericData.Record> records = Lists.newArrayListWithCapacity(recordCount); | ||||||
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); | ||||||
for (int i = 1; i <= recordCount; i++) { | ||||||
GenericData.Record record = new GenericData.Record(avroSchema); | ||||||
for (Types.NestedField column : columns) { | ||||||
String value = column.name().repeat(10) + i; | ||||||
record.put(column.name(), value); | ||||||
} | ||||||
|
||||||
records.add(record); | ||||||
} | ||||||
|
||||||
long actualSize = | ||||||
write( | ||||||
file, | ||||||
schema, | ||||||
ImmutableMap.of("write.parquet.row-group-size-bytes", "1048576"), | ||||||
ParquetAvroWriter::buildWriter, | ||||||
records.toArray(new GenericData.Record[] {})); | ||||||
|
||||||
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { | ||||||
ParquetMetadata footer = reader.getFooter(); | ||||||
for (int i = 1; i < footer.getBlocks().size() - 1; i++) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the test skips first row group. The first row group is expected to be not accurate in size. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please add a comment so that this is clear for a reader why the first one is skipped? |
||||||
assertThat(footer.getBlocks().get(i).getCompressedSize()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it correct to look at the compressed size here instead of looking at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If parquet row group size is referring to on disk size, then it should be getCompressedSize(). |
||||||
.isBetween((long) 900 * 1024, (long) 1200 * 1024); | ||||||
} | ||||||
|
||||||
assertThat(footer.getBlocks().get(footer.getBlocks().size() - 1).getCompressedSize()) | ||||||
.isLessThan((long) 1200 * 1024); | ||||||
} | ||||||
} | ||||||
|
||||||
private Pair<File, Long> generateFile( | ||||||
Function<MessageType, ParquetValueWriter<?>> createWriterFunc, | ||||||
int desiredRecordCount, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -557,8 +557,8 @@ public void testBinPackCombineMixedFiles() { | |
|
||
@Test | ||
public void testBinPackCombineMediumFiles() { | ||
Table table = createTable(4); | ||
shouldHaveFiles(table, 4); | ||
Table table = createTable(6); | ||
shouldHaveFiles(table, 6); | ||
Comment on lines
+560
to
+561
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After this change, the test table has smaller number of row groups. Small change like 4->3 may cause small tail when generating taskGroups Using 6 -> 3 can have better tolerance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand why changes to this test are being made There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 4/3 may produce groups with [1.4, 1.3, 1.3] or [1.3, 1.3, 1.3, 0.1] |
||
|
||
List<Object[]> expectedRecords = currentData(); | ||
int targetSize = ((int) testDataSize(table) / 3); | ||
|
@@ -578,7 +578,7 @@ public void testBinPackCombineMediumFiles() { | |
|
||
assertThat(result.rewrittenDataFilesCount()) | ||
.as("Action should delete 4 data files") | ||
.isEqualTo(4); | ||
.isEqualTo(6); | ||
assertThat(result.addedDataFilesCount()).as("Action should add 3 data files").isEqualTo(3); | ||
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for my own clarity this is
currentRawBufferedSize = The amount of "buffer" in the currentRowGroup calculated as a diff in getBufferedSize()
toatalRawBufferedSIze = The amount of "buffer" in all RowGroups calculated as a diff in getBufferedSize()
totalRowGroupSize = The amount of "buffer" in all RowGroups calculated by just adding getBufferedSize() for each rowgroup together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each row group data has two different sizes:
currentRawBufferedSize: "in_memory" of current unfinished row group
toatalRawBufferedSIze: sum(in_memory for rg in finished_row_groups)
totalRowGroupSize: sum(written for rg in finished_row_groups)
we use (toatalRawBufferedSIze / totalRowGroupSize) ratio to estimate size change from in-memory to written state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do some renames of the variables to match these definitions? I think it's hard (at least for me) to follow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for renaming the variables to make it clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
currentRawBufferedSize
->currentRowGroupMemoryBufferSize
totalRawBufferedSize
->cumulativeMemoryBufferSize
totalRowGroupSize
->cumulativeWrittenSize