Skip to content

Commit

Permalink
Spark 3.1: Validate table columns don't conflict with metadata columns (
Browse files Browse the repository at this point in the history
apache#5501)

Backport of apache#3456.
  • Loading branch information
hililiwei authored Aug 14, 2022
1 parent 39c89ae commit 41a7152
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand Down Expand Up @@ -305,4 +308,20 @@ public static long estimateSize(StructType tableSchema, long totalRecords) {
}
return result;
}

public static void validateMetadataColumnReferences(Schema tableSchema, Schema readSchema) {
List<String> conflictingColumnNames =
readSchema.columns().stream()
.map(Types.NestedField::name)
.filter(
name ->
MetadataColumns.isMetadataColumn(name) && tableSchema.findField(name) != null)
.collect(Collectors.toList());

ValidationException.check(
conflictingColumnNames.isEmpty(),
"Table column names conflict with names reserved for Iceberg metadata columns: %s.\n"
+ "Please, use ALTER TABLE statements to rename the conflicting table columns.",
conflictingColumnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
Schema expectedSchema,
List<Expression> filters,
CaseInsensitiveStringMap options) {

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -168,6 +169,42 @@ public void testPartitionMetadataColumnWithUnknownTransforms() {
() -> sql("SELECT _partition FROM %s", TABLE_NAME));
}

@Test
public void testConflictingColumns() {
table
.updateSchema()
.addColumn(MetadataColumns.SPEC_ID.name(), Types.IntegerType.get())
.addColumn(MetadataColumns.FILE_PATH.name(), Types.StringType.get())
.commit();

sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1', -1, 'path/to/file')", TABLE_NAME);

assertEquals(
"Rows must match",
ImmutableList.of(row(1L, "a1")),
sql("SELECT id, category FROM %s", TABLE_NAME));

AssertHelpers.assertThrows(
"Should fail to query conflicting columns",
ValidationException.class,
"column names conflict",
() -> sql("SELECT * FROM %s", TABLE_NAME));

table.refresh();

table
.updateSchema()
.renameColumn(MetadataColumns.SPEC_ID.name(), "_renamed" + MetadataColumns.SPEC_ID.name())
.renameColumn(
MetadataColumns.FILE_PATH.name(), "_renamed" + MetadataColumns.FILE_PATH.name())
.commit();

assertEquals(
"Rows must match",
ImmutableList.of(row(0, null, -1)),
sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s", TABLE_NAME));
}

private void createAndInitTable() throws IOException {
this.table =
TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned());
Expand Down

0 comments on commit 41a7152

Please sign in to comment.