Skip to content

Commit 5cc90ed

Browse files
committed
Spark 3.5: Spark action to compute the partition stats
1 parent 468e3f9 commit 5cc90ed

File tree

11 files changed

+910
-1
lines changed

11 files changed

+910
-1
lines changed

api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java

+6
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,10 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
7070
throw new UnsupportedOperationException(
7171
this.getClass().getName() + " does not implement rewritePositionDeletes");
7272
}
73+
74+
/** Instantiates an action to compute partition statistics and register it to table metadata. */
75+
default ComputePartitionStats computePartitionStatistics(Table table) {
76+
throw new UnsupportedOperationException(
77+
this.getClass().getName() + " does not implement computePartitionStatistics");
78+
}
7379
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.actions;
20+
21+
import org.apache.iceberg.PartitionStatisticsFile;
22+
23+
/** An action to compute and register partition stats. */
24+
public interface ComputePartitionStats
25+
extends Action<ComputePartitionStats, ComputePartitionStats.Result> {
26+
27+
/** The action result that contains a summary of the execution. */
28+
interface Result {
29+
/**
30+
* Returns the output file which is registered to the table metadata, null if the table is
31+
* non-partitioned or empty.
32+
*/
33+
PartitionStatisticsFile outputFile();
34+
}
35+
}

core/src/main/java/org/apache/iceberg/PartitionEntry.java

+99
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import java.util.List;
22+
import java.util.Map;
2123
import java.util.Objects;
2224
import org.apache.avro.Schema;
2325
import org.apache.avro.generic.IndexedRecord;
2426
import org.apache.iceberg.avro.AvroSchemaUtil;
27+
import org.apache.iceberg.io.CloseableIterable;
2528
import org.apache.iceberg.types.Types;
29+
import org.apache.iceberg.util.PartitionUtil;
2630

2731
public class PartitionEntry implements IndexedRecord {
2832
private PartitionData partitionData;
@@ -261,6 +265,24 @@ private static Schema prepareAvroSchema(Types.StructType partitionType) {
261265
return AvroSchemaUtil.convert(icebergSchema(partitionType), "partitionEntry");
262266
}
263267

268+
public synchronized PartitionEntry update(PartitionEntry entry) {
269+
this.specId = Math.max(this.specId, entry.specId);
270+
this.dataRecordCount += entry.dataRecordCount;
271+
this.dataFileCount += entry.dataFileCount;
272+
this.dataFileSizeInBytes += entry.dataFileSizeInBytes;
273+
this.posDeleteRecordCount += entry.posDeleteRecordCount;
274+
this.posDeleteFileCount += entry.posDeleteFileCount;
275+
this.eqDeleteRecordCount += entry.eqDeleteRecordCount;
276+
this.eqDeleteFileCount += entry.eqDeleteFileCount;
277+
this.totalRecordCount += entry.totalRecordCount;
278+
if (this.lastUpdatedAt < entry.lastUpdatedAt) {
279+
this.lastUpdatedAt = entry.lastUpdatedAt();
280+
this.lastUpdatedSnapshotId = entry.lastUpdatedSnapshotId;
281+
}
282+
283+
return this;
284+
}
285+
264286
public static class Builder {
265287
private PartitionData partitionData;
266288
private int specId;
@@ -358,4 +380,81 @@ public PartitionEntry build() {
358380
return partition;
359381
}
360382
}
383+
384+
public static CloseableIterable<PartitionEntry> fromManifest(Table table, ManifestFile manifest) {
385+
CloseableIterable<? extends ManifestEntry<? extends ContentFile<?>>> entries =
386+
CloseableIterable.transform(
387+
ManifestFiles.open(manifest, table.io(), table.specs())
388+
.select(scanColumns(manifest.content())) // don't select stats columns
389+
.liveEntries(),
390+
t ->
391+
(ManifestEntry<? extends ContentFile<?>>)
392+
// defensive copy of manifest entry without stats columns
393+
t.copyWithoutStats());
394+
395+
Types.StructType partitionType = Partitioning.partitionType(table);
396+
return CloseableIterable.transform(
397+
entries, entry -> fromManifestEntry(entry, table, partitionType));
398+
}
399+
400+
private static PartitionEntry fromManifestEntry(
401+
ManifestEntry<?> entry, Table table, Types.StructType partitionType) {
402+
PartitionEntry.Builder builder = PartitionEntry.builder();
403+
builder
404+
.withSpecId(entry.file().specId())
405+
.withPartitionData(coercedPartitionData(entry.file(), table.specs(), partitionType));
406+
Snapshot snapshot = table.snapshot(entry.snapshotId());
407+
if (snapshot != null) {
408+
builder
409+
.withLastUpdatedSnapshotId(snapshot.snapshotId())
410+
.withLastUpdatedAt(snapshot.timestampMillis());
411+
}
412+
413+
switch (entry.file().content()) {
414+
case DATA:
415+
builder
416+
.withDataFileCount(1)
417+
.withDataRecordCount(entry.file().recordCount())
418+
.withDataFileSizeInBytes(entry.file().fileSizeInBytes());
419+
break;
420+
case POSITION_DELETES:
421+
builder.withPosDeleteFileCount(1).withPosDeleteRecordCount(entry.file().recordCount());
422+
break;
423+
case EQUALITY_DELETES:
424+
builder.withEqDeleteFileCount(1).withEqDeleteRecordCount(entry.file().recordCount());
425+
break;
426+
default:
427+
throw new UnsupportedOperationException(
428+
"Unsupported file content type: " + entry.file().content());
429+
}
430+
431+
// TODO: optionally compute TOTAL_RECORD_COUNT based on the flag
432+
return builder.build();
433+
}
434+
435+
private static PartitionData coercedPartitionData(
436+
ContentFile<?> file, Map<Integer, PartitionSpec> specs, Types.StructType partitionType) {
437+
// keep the partition data as per the unified spec by coercing
438+
StructLike partition =
439+
PartitionUtil.coercePartition(partitionType, specs.get(file.specId()), file.partition());
440+
PartitionData data = new PartitionData(partitionType);
441+
for (int i = 0; i < partitionType.fields().size(); i++) {
442+
Object val = partition.get(i, partitionType.fields().get(i).type().typeId().javaClass());
443+
if (val != null) {
444+
data.set(i, val);
445+
}
446+
}
447+
return data;
448+
}
449+
450+
private static List<String> scanColumns(ManifestContent content) {
451+
switch (content) {
452+
case DATA:
453+
return BaseScan.SCAN_COLUMNS;
454+
case DELETES:
455+
return BaseScan.DELETE_SCAN_COLUMNS;
456+
default:
457+
throw new UnsupportedOperationException("Cannot read unknown manifest type: " + content);
458+
}
459+
}
361460
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.actions;
20+
21+
import org.immutables.value.Value;
22+
23+
@Value.Enclosing
24+
@SuppressWarnings("ImmutablesStyle")
25+
@Value.Style(
26+
typeImmutableEnclosing = "ImmutableComputePartitionStats",
27+
visibilityString = "PUBLIC",
28+
builderVisibilityString = "PUBLIC")
29+
interface BaseComputePartitionStats extends ComputePartitionStats {
30+
31+
@Value.Immutable
32+
interface Result extends ComputePartitionStats.Result {}
33+
}

data/src/main/java/org/apache/iceberg/data/PartitionStatsUtil.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collection;
2424
import java.util.Iterator;
2525
import java.util.List;
26+
import java.util.UUID;
2627
import org.apache.avro.generic.GenericData;
2728
import org.apache.iceberg.FileFormat;
2829
import org.apache.iceberg.PartitionData;
@@ -49,10 +50,12 @@ private PartitionStatsUtil() {}
4950

5051
public static OutputFile newPartitionStatsFile(
5152
TableOperations ops, long snapshotId, FileFormat format) {
53+
// TODO: UUID is temp, remove it.
5254
return ops.io()
5355
.newOutputFile(
5456
ops.metadataFileLocation(
55-
format.addExtension(String.format("partition-stats-%d", snapshotId))));
57+
format.addExtension(
58+
String.format("partition-stats-%s-%d", UUID.randomUUID(), snapshotId))));
5659
}
5760

5861
public static void writePartitionStatsFile(

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java

+68
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919
package org.apache.iceberg.spark.actions;
2020

2121
import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
22+
import static org.apache.iceberg.MetadataTableType.ENTRIES;
2223
import static org.apache.spark.sql.functions.col;
24+
import static org.apache.spark.sql.functions.first;
2325
import static org.apache.spark.sql.functions.lit;
26+
import static org.apache.spark.sql.functions.max;
27+
import static org.apache.spark.sql.functions.sum;
28+
import static org.apache.spark.sql.functions.when;
2429

2530
import java.util.Collection;
2631
import java.util.Iterator;
@@ -42,6 +47,7 @@
4247
import org.apache.iceberg.MetadataTableType;
4348
import org.apache.iceberg.PartitionSpec;
4449
import org.apache.iceberg.ReachableFileUtil;
50+
import org.apache.iceberg.Snapshot;
4551
import org.apache.iceberg.StaticTableOperations;
4652
import org.apache.iceberg.Table;
4753
import org.apache.iceberg.TableMetadata;
@@ -72,6 +78,8 @@
7278
import org.apache.spark.sql.Dataset;
7379
import org.apache.spark.sql.Row;
7480
import org.apache.spark.sql.SparkSession;
81+
import org.apache.spark.sql.functions;
82+
import org.apache.spark.sql.types.DataTypes;
7583
import org.slf4j.Logger;
7684
import org.slf4j.LoggerFactory;
7785

@@ -164,6 +172,66 @@ protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
164172
return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
165173
}
166174

175+
protected Dataset<Row> partitionEntryDS(Table table) {
176+
Dataset<Row> dataset =
177+
loadMetadataTable(table, ENTRIES)
178+
.filter(col("status").$less(2))
179+
.select(
180+
col("data_file.spec_id").as("SPEC_ID"),
181+
col("data_file.partition").as("PARTITION_DATA"),
182+
when(col("data_file.content").equalTo(0), col("data_file.record_count"))
183+
.otherwise(lit(0))
184+
.as("DATA_RECORD_COUNT"),
185+
when(col("data_file.content").equalTo(0), lit(1))
186+
.otherwise(lit(0))
187+
.as("DATA_FILE_COUNT"),
188+
when(col("data_file.content").equalTo(0), col("data_file.file_size_in_bytes"))
189+
.otherwise(lit(0))
190+
.as("DATA_FILE_SIZE_IN_BYTES"),
191+
when(col("data_file.content").equalTo(1), col("data_file.record_count"))
192+
.otherwise(lit(0))
193+
.as("POSITION_DELETE_RECORD_COUNT"),
194+
when(col("data_file.content").equalTo(1), lit(1))
195+
.otherwise(lit(0))
196+
.as("POSITION_DELETE_FILE_COUNT"),
197+
when(col("data_file.content").equalTo(2), col("data_file.record_count"))
198+
.otherwise(lit(0))
199+
.as("EQUALITY_DELETE_RECORD_COUNT"),
200+
when(col("data_file.content").equalTo(2), lit(1))
201+
.otherwise(lit(0))
202+
.as("EQUALITY_DELETE_FILE_COUNT"),
203+
functions
204+
.udf(
205+
(Long snapshotId) -> lastUpdatedTime(snapshotId, table), DataTypes.LongType)
206+
.apply(col("snapshot_id"))
207+
.as("LAST_UPDATED_AT"),
208+
col("snapshot_id").as("LAST_UPDATED_SNAPSHOT_ID"),
209+
lit(0)
210+
.alias("TOTAL_RECORD_COUNT")); // TODO: not sure if this can be computed by this
211+
// distributed algorithm. This was meant to be
212+
// effective count after applying deletes.
213+
214+
return dataset
215+
.groupBy(col("PARTITION_DATA"))
216+
.agg(
217+
max(col("LAST_UPDATED_SNAPSHOT_ID")).as("LAST_UPDATED_SNAPSHOT_ID"),
218+
first(col("LAST_UPDATED_AT")).as("LAST_UPDATED_AT"),
219+
max(col("SPEC_ID")).as("SPEC_ID"),
220+
sum(col("DATA_FILE_COUNT")).as("DATA_FILE_COUNT"),
221+
sum(col("DATA_RECORD_COUNT")).as("DATA_RECORD_COUNT"),
222+
sum(col("DATA_FILE_SIZE_IN_BYTES")).as("DATA_FILE_SIZE_IN_BYTES"),
223+
sum(col("POSITION_DELETE_FILE_COUNT")).as("POSITION_DELETE_FILE_COUNT"),
224+
sum(col("POSITION_DELETE_RECORD_COUNT")).as("POSITION_DELETE_RECORD_COUNT"),
225+
sum(col("EQUALITY_DELETE_FILE_COUNT")).as("EQUALITY_DELETE_FILE_COUNT"),
226+
sum(col("EQUALITY_DELETE_RECORD_COUNT")).as("EQUALITY_DELETE_RECORD_COUNT"),
227+
sum(col("TOTAL_RECORD_COUNT")).as("TOTAL_RECORD_COUNT"));
228+
}
229+
230+
public static long lastUpdatedTime(long snapshotId, Table table) {
231+
Snapshot snapshot = table.snapshot(snapshotId);
232+
return snapshot == null ? 0 : snapshot.timestampMillis();
233+
}
234+
167235
protected Dataset<FileInfo> manifestDS(Table table) {
168236
return manifestDS(table, null);
169237
}

0 commit comments

Comments
 (0)