Skip to content

Commit ffab9a8

Browse files
committed
Test performance
1 parent 1cb679d commit ffab9a8

File tree

2 files changed

+153
-1
lines changed

2 files changed

+153
-1
lines changed

data/src/main/java/org/apache/iceberg/data/PartitionStatsUtil.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collection;
2424
import java.util.Iterator;
2525
import java.util.List;
26+
import java.util.UUID;
2627
import org.apache.avro.generic.GenericData;
2728
import org.apache.iceberg.FileFormat;
2829
import org.apache.iceberg.PartitionData;
@@ -49,10 +50,12 @@ private PartitionStatsUtil() {}
4950

5051
public static OutputFile newPartitionStatsFile(
5152
TableOperations ops, long snapshotId, FileFormat format) {
53+
// TODO: UUID is temp, remove it.
5254
return ops.io()
5355
.newOutputFile(
5456
ops.metadataFileLocation(
55-
format.addExtension(String.format("partition-stats-%d", snapshotId))));
57+
format.addExtension(
58+
String.format("partition-stats-%s-%d", UUID.randomUUID(), snapshotId))));
5659
}
5760

5861
public static void writePartitionStatsFile(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.actions;
20+
21+
import static org.apache.iceberg.types.Types.NestedField.optional;
22+
23+
import java.io.IOException;
24+
import java.util.List;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.iceberg.AppendFiles;
27+
import org.apache.iceberg.DataFile;
28+
import org.apache.iceberg.FileGenerationUtil;
29+
import org.apache.iceberg.Files;
30+
import org.apache.iceberg.PartitionEntry;
31+
import org.apache.iceberg.PartitionSpec;
32+
import org.apache.iceberg.Partitioning;
33+
import org.apache.iceberg.Schema;
34+
import org.apache.iceberg.StructLike;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.TestHelpers;
37+
import org.apache.iceberg.actions.ComputePartitionStats;
38+
import org.apache.iceberg.data.PartitionStatsUtil;
39+
import org.apache.iceberg.hadoop.HadoopTables;
40+
import org.apache.iceberg.io.CloseableIterable;
41+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
42+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
43+
import org.apache.iceberg.spark.SparkTestBase;
44+
import org.apache.iceberg.types.Types;
45+
import org.apache.spark.sql.Encoders;
46+
import org.apache.spark.sql.Row;
47+
import org.assertj.core.api.Assertions;
48+
import org.junit.Before;
49+
import org.junit.Rule;
50+
import org.junit.Test;
51+
import org.junit.rules.TemporaryFolder;
52+
53+
public class TestPartitionStatsPerf extends SparkTestBase {
54+
55+
private static final HadoopTables TABLES = new HadoopTables(new Configuration());
56+
protected static final Schema SCHEMA =
57+
new Schema(
58+
optional(1, "c1", Types.IntegerType.get()),
59+
optional(2, "c2", Types.StringType.get()),
60+
optional(3, "c3", Types.StringType.get()));
61+
protected static final PartitionSpec SPEC =
62+
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
63+
64+
@Rule public TemporaryFolder temp = new TemporaryFolder();
65+
protected String tableLocation = null;
66+
67+
@Before
68+
public void setupTableLocation() throws Exception {
69+
this.tableLocation = temp.newFolder().toURI().toString();
70+
}
71+
72+
@Test
73+
public void testPerf() {
74+
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation);
75+
final int partitionCount = 20;
76+
final int datafilesPerPartitionCount = 10000;
77+
78+
for (int partitionOrdinal = 0; partitionOrdinal < partitionCount; partitionOrdinal++) {
79+
StructLike partition = TestHelpers.Row.of(partitionOrdinal);
80+
81+
AppendFiles appendFiles = table.newAppend();
82+
83+
for (int fileOrdinal = 0; fileOrdinal < datafilesPerPartitionCount; fileOrdinal++) {
84+
DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition);
85+
appendFiles.appendFile(dataFile);
86+
}
87+
88+
appendFiles.commit();
89+
}
90+
91+
List<String> validFiles =
92+
spark
93+
.read()
94+
.format("iceberg")
95+
.load(tableLocation + "#files")
96+
.select("file_path")
97+
.as(Encoders.STRING())
98+
.collectAsList();
99+
Assertions.assertThat(validFiles).hasSize(partitionCount * datafilesPerPartitionCount);
100+
Assertions.assertThat(table.partitionStatisticsFiles()).isEmpty();
101+
102+
// -- local compute --------
103+
long base = System.currentTimeMillis();
104+
SparkActions actions = SparkActions.get();
105+
ComputePartitionStats.Result result =
106+
actions.computePartitionStatistics(table).localCompute(true).execute();
107+
Assertions.assertThat(table.partitionStatisticsFiles()).containsExactly(result.outputFile());
108+
System.out.println(
109+
"#### time taken for local compute in milli: " + (System.currentTimeMillis() - base));
110+
111+
// read the partition entries from the stats file
112+
Schema schema =
113+
PartitionEntry.icebergSchema(Partitioning.partitionType(table.specs().values()));
114+
List<PartitionEntry> rows;
115+
try (CloseableIterable<PartitionEntry> recordIterator =
116+
PartitionStatsUtil.readPartitionStatsFile(
117+
schema, Files.localInput(result.outputFile().path()))) {
118+
rows = Lists.newArrayList(recordIterator);
119+
} catch (IOException e) {
120+
throw new RuntimeException(e);
121+
}
122+
Assertions.assertThat(rows.size()).isEqualTo(partitionCount);
123+
124+
// ---- distributed compute ---
125+
table
126+
.updatePartitionStatistics()
127+
.removePartitionStatistics(result.outputFile().snapshotId())
128+
.commit();
129+
Assertions.assertThat(table.partitionStatisticsFiles()).isEmpty();
130+
131+
base = System.currentTimeMillis();
132+
actions = SparkActions.get();
133+
result = actions.computePartitionStatistics(table).localCompute(false).execute();
134+
Assertions.assertThat(table.partitionStatisticsFiles()).containsExactly(result.outputFile());
135+
System.out.println(
136+
"#### time taken for distributed compute in milli: " + (System.currentTimeMillis() - base));
137+
138+
// can't use PartitionStatsUtil.readPartitionStatsFile as it uses
139+
// ParquetAvroValueReaders$ReadBuilder
140+
// and since native parquet doesn't write column ids, reader throws NPE.
141+
List<Row> output =
142+
spark
143+
.read()
144+
.parquet(result.outputFile().path())
145+
.select("PARTITION_DATA", "DATA_RECORD_COUNT", "DATA_FILE_COUNT")
146+
.collectAsList();
147+
Assertions.assertThat(output.size()).isEqualTo(partitionCount);
148+
}
149+
}

0 commit comments

Comments
 (0)