Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public StructType partitionType() {
for (PartitionField field : fields) {
Type sourceType = schema.findType(field.sourceId());
Type resultType = field.transform().getResultType(sourceType);

// When the source field has been dropped we cannot determine the type
if (sourceType == null) {
resultType = Types.UnknownType.get();
}

structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType));
}

Expand Down Expand Up @@ -614,8 +620,12 @@ Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
}

public PartitionSpec build() {
return build(false);
}

public PartitionSpec build(boolean allowMissingFields) {
PartitionSpec spec = buildUnchecked();
checkCompatibility(spec, schema);
checkCompatibility(spec, schema, allowMissingFields);
return spec;
}

Expand All @@ -625,10 +635,18 @@ PartitionSpec buildUnchecked() {
}

static void checkCompatibility(PartitionSpec spec, Schema schema) {
checkCompatibility(spec, schema, false);
}

static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowMissingFields) {
final Map<Integer, Integer> parents = TypeUtil.indexParents(schema.asStruct());
for (PartitionField field : spec.fields) {
Type sourceType = schema.findType(field.sourceId());
Transform<?, ?> transform = field.transform();
// In the case the underlying field is dropped, we cannot check if they are compatible
if (allowMissingFields && sourceType == null) {
continue;
}
// In the case of a Version 1 partition-spec field gets deleted,
// it is replaced with a void transform, see:
// https://iceberg.apache.org/spec/#partition-transforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public PartitionSpec bind(Schema schema) {
return copyToBuilder(schema).build();
}

public PartitionSpec bind(Schema schema, boolean ignoreMissingFields) {
return copyToBuilder(schema).build(ignoreMissingFields);
}

PartitionSpec bindUnchecked(Schema schema) {
return copyToBuilder(schema).buildUnchecked();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static String toJson(UnboundPartitionSpec spec, boolean pretty) {
}

public static PartitionSpec fromJson(Schema schema, JsonNode json) {
return fromJson(json).bind(schema);
return fromJson(json).bind(schema, true);
}

public static UnboundPartitionSpec fromJson(JsonNode json) {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec
*/
public static StructType partitionType(Table table) {
Collection<PartitionSpec> specs = table.specs().values();
return buildPartitionProjectionType("table partition", specs, allFieldIds(specs));
return buildPartitionProjectionType(
"table partition", specs, allActiveFieldIds(table.schema(), specs));
}

/**
Expand Down Expand Up @@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?>
|| t2.equals(Transforms.alwaysNull());
}

// collects IDs of all partition field used across specs
private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
// collects IDs of all partition field used across specs that are in the current schema
private static Set<Integer> allActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
return FluentIterable.from(specs)
.transformAndConcat(PartitionSpec::fields)
.filter(field -> schema.findField(field.sourceId()) != null)
.transform(PartitionField::fieldId)
.toSet();
}
Expand Down
30 changes: 29 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestPartitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testPartitionTypeWithIncompatibleSpecEvolution() {

PartitionSpec newSpec = PartitionSpec.builderFor(table.schema()).identity("category").build();

TableOperations ops = ((HasTableOperations) table).operations();
TableOperations ops = table.operations();
TableMetadata current = ops.current();
ops.commit(current, current.updatePartitionSpec(newSpec));

Expand All @@ -183,6 +183,34 @@ public void testPartitionTypeWithIncompatibleSpecEvolution() {
.hasMessageStartingWith("Conflicting partition fields");
}

@Test
public void testPartitionTypeIgnoreInactiveFields() {
TestTables.TestTable table =
TestTables.create(
tableDir, "test", SCHEMA, BY_DATA_CATEGORY_BUCKET_SPEC, V2_FORMAT_VERSION);

StructType actualType = Partitioning.partitionType(table);
assertThat(actualType)
.isEqualTo(
StructType.of(
NestedField.optional(1000, "data", Types.StringType.get()),
NestedField.optional(1001, "category_bucket", Types.IntegerType.get())));

// Create a new spec, and drop the field of the old spec
table.updateSpec().removeField("category_bucket").commit();
table.updateSchema().deleteColumn("category").commit();

actualType = Partitioning.partitionType(table);
assertThat(actualType)
.isEqualTo(StructType.of(NestedField.optional(1000, "data", Types.StringType.get())));

table.updateSpec().removeField("data").commit();
table.updateSchema().deleteColumn("data").commit();

actualType = Partitioning.partitionType(table);
assertThat(actualType).isEqualTo(StructType.of());
}

@Test
public void testGroupingKeyTypeWithSpecEvolutionInV1Tables() {
TestTables.TestTable table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.connector.catalog.CatalogManager;
Expand Down Expand Up @@ -583,4 +587,46 @@ private void createTable(String schema, String spec) {
tableName, schema, spec, TableProperties.FORMAT_VERSION, formatVersion);
}
}

private void runCreateAndDropPartitionField(
String column, String partitionType, List<Object[]> expected, String predicate) {
sql("DROP TABLE IF EXISTS %s", tableName);
sql(
"CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)",
tableName, formatVersion);
sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType);
sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName);
sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName);
sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName);
sql("ALTER TABLE %s DROP COLUMN %s", tableName, column);

assertEquals(
"Should return correct data",
expected,
sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate));
}

@TestTemplate
public void testDropPartitionAndSourceColumnLong() {
String predicateTs = "col_long >= 2200";
List<Object[]> expectedTs =
Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L});
runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs);
}

@TestTemplate
public void testDropPartitionAndSourceColumnTimestamp() {
String predicate = "col_ts >= '2024-04-01 19:25:00'";
List<Object[]> expected =
Lists.newArrayList(
new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)},
new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)});
runCreateAndDropPartitionField("col_long", "col_long", expected, predicate);
runCreateAndDropPartitionField("col_long", "truncate(2, col_long)", expected, predicate);
runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expected, predicate);
}
}