diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index ffbcbeee156e..8db92e1897a2 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -7,6 +7,10 @@ acceptedBreaks: old: "class org.apache.iceberg.PartitionKey" new: "class org.apache.iceberg.PartitionKey" justification: "Serialization across versions is not supported" + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.types.Types.NestedField" + new: "class org.apache.iceberg.types.Types.NestedField" + justification: "Serialization across versions is not supported" - code: "java.class.removed" old: "interface org.apache.iceberg.Rollback" justification: "Deprecations for 1.0 release" @@ -16,9 +20,15 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method java.util.List org.apache.iceberg.Table::statisticsFiles()" justification: "new API method" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)" + justification: "Adding table scan APIs to support scanning from refs" - code: "java.method.removed" old: "method java.lang.Iterable org.apache.iceberg.Snapshot::addedFiles()" justification: "Deprecations for 1.0 release" + - code: "java.method.removed" + old: "method java.lang.Iterable org.apache.iceberg.Snapshot::deletedFiles()" + justification: "Deprecations for 1.0 release" - code: "java.method.removed" old: "method java.util.List org.apache.iceberg.Snapshot::allManifests()" justification: "Deprecations for 1.0 release" @@ -28,9 +38,6 @@ acceptedBreaks: - code: "java.method.removed" old: "method java.util.List org.apache.iceberg.Snapshot::deleteManifests()" justification: "Deprecations for 1.0 release" - - code: "java.method.removed" - old: "method java.lang.Iterable org.apache.iceberg.Snapshot::deletedFiles()" - justification: "Deprecations for 1.0 release" - code: "java.method.removed" old: "method org.apache.iceberg.OverwriteFiles org.apache.iceberg.OverwriteFiles::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)" justification: "Deprecations for 1.0 release" @@ -40,9 +47,6 @@ acceptedBreaks: - code: "java.method.removed" old: "method org.apache.iceberg.RowDelta org.apache.iceberg.RowDelta::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)" justification: "Deprecations for 1.0 release" - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)" - justification: "Adding table scan APIs to support scanning from refs" release-base-0.13.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 07586faaf47d..8db97e874819 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -419,7 +419,7 @@ public static NestedField optional(int id, String name, Type type, String doc) { return new NestedField(true, id, name, type, doc, null, null); } - public static NestedField optional( + protected static NestedField optional( int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } @@ -464,7 +464,7 @@ public static NestedField of( private final Object initialDefault; private final Object writeDefault; - private NestedField( + protected NestedField( boolean isOptional, int id, String name, diff --git a/core/src/main/java/org/apache/iceberg/SingleValueParser.java b/core/src/main/java/org/apache/iceberg/SingleValueParser.java index 1d94929fbc51..2a9f490ec4d7 100644 --- a/core/src/main/java/org/apache/iceberg/SingleValueParser.java +++ b/core/src/main/java/org/apache/iceberg/SingleValueParser.java @@ -153,7 +153,7 @@ public static Object fromJson(Type type, JsonNode defaultValue) { defaultLength); byte[] fixedBytes = BaseEncoding.base16().decode(defaultValue.textValue().toUpperCase(Locale.ROOT)); - return ByteBuffer.wrap(fixedBytes); + return fixedBytes; case BINARY: Preconditions.checkArgument( defaultValue.isTextual(), "Cannot parse default as a %s value: %s", type, defaultValue); @@ -242,7 +242,7 @@ public static String toJson(Type type, Object defaultValue, boolean pretty) { return JsonUtil.generate(gen -> toJson(type, defaultValue, gen), pretty); } - @SuppressWarnings("checkstyle:MethodLength") + @SuppressWarnings({"checkstyle:MethodLength", "checkstyle:CyclomaticComplexity"}) public static void toJson(Type type, Object defaultValue, JsonGenerator generator) throws IOException { if (defaultValue == null) { @@ -309,17 +309,28 @@ public static void toJson(Type type, Object defaultValue, JsonGenerator generato generator.writeString(defaultValue.toString()); break; case FIXED: + // Normally, FIXED is backed by a byte[], but it can also be backed by a ByteBuffer in the + // case of Fixed Literals. Ideally, Fixed Literals would be backed by a byte[], but that + // would make the APIs backwards incompatible. + // See {@link org.apache.iceberg.expressions.Literals.FixedLiteral}. Preconditions.checkArgument( - defaultValue instanceof ByteBuffer, "Invalid default %s value: %s", type, defaultValue); - ByteBuffer byteBufferValue = (ByteBuffer) defaultValue; + defaultValue instanceof byte[] || defaultValue instanceof ByteBuffer, + "Invalid default %s value: %s", + type, + defaultValue); + byte[] byteArrayValue; + if (defaultValue instanceof ByteBuffer) { + byteArrayValue = ByteBuffers.toByteArray((ByteBuffer) defaultValue); + } else { + byteArrayValue = (byte[]) defaultValue; + } int expectedLength = ((Types.FixedType) type).length(); Preconditions.checkArgument( - byteBufferValue.remaining() == expectedLength, + byteArrayValue.length == expectedLength, "Invalid default %s value, incorrect length: %s", type, - byteBufferValue.remaining()); - generator.writeString( - BaseEncoding.base16().encode(ByteBuffers.toByteArray(byteBufferValue))); + byteArrayValue.length); + generator.writeString(BaseEncoding.base16().encode(byteArrayValue)); break; case BINARY: Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 38d822d87eac..c5866e914b26 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -105,14 +105,14 @@ public Schema record(Schema record, List names, Iterable s (field.isRequired() && field.initialDefault() != null) || field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), - "Missing required field that doesn't have a default value: %s", + "Missing required field: %s", field.name()); - // If the field from Iceberg schema has initial default value, we just pass and don't - // project it to the avro file read schema with the generated _r field name, - // the default value will be directly read from the Iceberg layer + // If the field has an initial default value, do not project it in the read schema. + // The default will be added in {@link org.apache.iceberg.avro.ValueReader}. if (field.initialDefault() != null) { continue; } + // Create a field that will be defaulted to null. We assign a unique suffix to the field // to make sure that even if records in the file have the field it is not projected. Schema.Field newField = diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 59f499ed9ed8..f7254d0aaec1 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -566,8 +566,10 @@ public Map read(Decoder decoder, Object reuse) throws IOException { public abstract static class StructReader implements ValueReader, SupportsRowPosition { private final ValueReader[] readers; - private final int[] positions; - private final Object[] constants; + private final int[] constantValuesPositions; + private final Object[] constantValues; + private final int[] defaultValuesPositions; + private final Object[] defaultValues; private int posField = -1; protected StructReader(List> readers, Schema schema) { @@ -586,43 +588,51 @@ protected StructReader(List> readers, Schema schema) { } if (isDeletedColumnPos == null) { - this.positions = new int[0]; - this.constants = new Object[0]; + this.constantValuesPositions = new int[0]; + this.constantValues = new Object[0]; } else { - this.positions = new int[] {isDeletedColumnPos}; - this.constants = new Object[] {false}; + this.constantValuesPositions = new int[] {isDeletedColumnPos}; + this.constantValues = new Object[] {false}; } + this.defaultValuesPositions = new int[0]; + this.defaultValues = new Object[0]; } protected StructReader( - List> readers, - Types.StructType struct, - Schema record, - Map idToConstant) { + List> readers, Types.StructType struct, Map idToConstant) { this.readers = readers.toArray(new ValueReader[0]); List fields = struct.fields(); - List positionList = Lists.newArrayListWithCapacity(fields.size()); - List constantList = Lists.newArrayListWithCapacity(fields.size()); + List constantValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); + List constantValuesList = Lists.newArrayListWithCapacity(fields.size()); + List defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); + List defaultValuesList = Lists.newArrayListWithCapacity(fields.size()); for (int pos = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); if (idToConstant.containsKey(field.fieldId())) { - positionList.add(pos); - constantList.add(idToConstant.get(field.fieldId())); + constantValuesPositionList.add(pos); + constantValuesList.add(idToConstant.get(field.fieldId())); } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { - positionList.add(pos); - constantList.add(false); - } else if (record.getField(field.name()) == null && field.initialDefault() != null) { - positionList.add(pos); - constantList.add(field.initialDefault()); + constantValuesPositionList.add(pos); + constantValuesList.add(false); + } else if (field.initialDefault() != null) { + // Add a constant value for fields that have a default value. + // In the {@link #read()} method, this will be leveraged only if there is no corresponding + // reader. + defaultValuesPositionList.add(pos); + defaultValuesList.add(field.initialDefault()); } } - this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); - this.constants = constantList.toArray(); + this.constantValuesPositions = + constantValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.constantValues = constantValuesList.toArray(); + this.defaultValuesPositions = + defaultValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.defaultValues = defaultValuesList.toArray(); } @Override @@ -659,13 +669,18 @@ public ValueReader reader(int pos) { public S read(Decoder decoder, Object reuse) throws IOException { S struct = reuseOrCreate(reuse); + // Set the default values first. Setting default values first allows them to be overridden + // once the data is read from the file if the corresponding field is present in the file. + for (int i = 0; i < defaultValuesPositions.length; i += 1) { + set(struct, defaultValuesPositions[i], defaultValues[i]); + } + if (decoder instanceof ResolvingDecoder) { // this may not set all of the fields. nulls are set by default. for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { Object reusedValue = get(struct, field.pos()); set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); } - } else { for (int i = 0; i < readers.length; i += 1) { Object reusedValue = get(struct, i); @@ -673,8 +688,8 @@ public S read(Decoder decoder, Object reuse) throws IOException { } } - for (int i = 0; i < positions.length; i += 1) { - set(struct, positions[i], constants[i]); + for (int i = 0; i < constantValuesPositions.length; i += 1) { + set(struct, constantValuesPositions[i], constantValues[i]); } return struct; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 4c41ecaa59f2..631c50486564 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -113,7 +113,7 @@ private GenericRecordReader( StructType struct, Schema record, Map idToConstant) { - super(readers, struct, record, idToConstant); + super(readers, struct, idToConstant); this.structType = struct; } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index fe348117d9b9..c6bdb88b7d97 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -205,7 +205,7 @@ public void testMissingRequiredFields() { AssertHelpers.assertThrows( "Missing required field in nameMapping", IllegalArgumentException.class, - "Missing required field that doesn't have a default value: x", + "Missing required field: x", // In this case, pruneColumns result is an empty record () -> writeAndRead(writeSchema, readSchema, record, nameMapping)); } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index ae5e05923649..d4813ca7bc77 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -18,12 +18,10 @@ */ package org.apache.iceberg.data; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ByteBuffers; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -90,16 +88,6 @@ private static void assertEquals(Type type, Object expected, Object actual) { "Primitive value should be equal to expected for type " + type, expected, actual); break; case FIXED: - // For fixed type, Iceberg actually represent value as Bytebuffer, - // but since RandomGenericData generates bytearray data for fixed type - // for it to be written to Avro, we add a conversion here to make the - // equality comparison consistent using bytearray - if (expected instanceof ByteBuffer) { - expected = ByteBuffers.toByteArray((ByteBuffer) expected); - } - if (actual instanceof ByteBuffer) { - actual = ByteBuffers.toByteArray((ByteBuffer) actual); - } Assertions.assertThat(expected) .as("Expected should be a byte[]") .isInstanceOf(byte[].class); diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java index 3cc8b9ececbf..0f98d2e5712f 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java @@ -125,15 +125,16 @@ public void writeAndValidate() throws IOException { Schema readerSchema = new Schema( required(999, "col1", Types.IntegerType.get()), - optional(1000, "col2", type, null, defaultValue, defaultValue)); + NestedFieldWithInitialDefault.optionalWithDefault( + 1000, "col2", type, null, defaultValue, defaultValue)); List generatedRecords = RandomGenericData.generate(writerSchema, 100, 0L); List expected = Lists.newArrayList(); for (Record record : generatedRecords) { - Record r = GenericRecord.create(readerSchema); - r.set(0, record.get(0)); - r.set(1, defaultValue); - expected.add(r); + Record expectedRecord = GenericRecord.create(readerSchema); + expectedRecord.set(0, record.get(0)); + expectedRecord.set(1, defaultValue); + expected.add(expectedRecord); } File testFile = temp.newFile(); @@ -164,4 +165,34 @@ public void writeAndValidate() throws IOException { } } } + + // TODO: This class should be removed once NestedField.optional() that takes initialDefault and + // writeDefault becomes public. It is intentionally package private to avoid exposing it in the + // public API as of now. + static class NestedFieldWithInitialDefault extends Types.NestedField { + private final Object initialDefault; + + NestedFieldWithInitialDefault( + boolean isOptional, + int fieldId, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { + super(isOptional, fieldId, name, type, doc, initialDefault, writeDefault); + this.initialDefault = initialDefault; + } + + static Types.NestedField optionalWithDefault( + int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { + return new NestedFieldWithInitialDefault( + true, id, name, type, doc, initialDefault, writeDefault); + } + + @Override + public Object initialDefault() { + return initialDefault; + } + } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java index 00679a165f1f..86404959735a 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java @@ -83,7 +83,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType expected, Schema record, List names, List> fields) { - return FlinkValueReaders.struct(fields, expected.asStructType(), record, idToConstant); + return FlinkValueReaders.struct(fields, expected.asStructType(), idToConstant); } @Override diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index f55620928d63..32f6c3a2ccfd 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.flink.table.data.ArrayData; @@ -88,8 +87,8 @@ static ValueReader map(ValueReader keyReader, ValueReader valueRe } static ValueReader struct( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - return new StructReader(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + return new StructReader(readers, struct, idToConstant); } private static class StringReader implements ValueReader { @@ -287,8 +286,8 @@ private static class StructReader extends ValueReaders.StructReader { private final int numFields; private StructReader( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - super(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); this.numFields = readers.size(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index 8caaffd00bdb..4622d2928ac4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -83,7 +83,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType expected, Schema record, List names, List> fields) { - return SparkValueReaders.struct(fields, expected, record, idToConstant); + return SparkValueReaders.struct(fields, expected, idToConstant); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 76068c3103fa..11655c72d857 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; @@ -76,8 +75,8 @@ static ValueReader map(ValueReader keyReader, ValueReader< } static ValueReader struct( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - return new StructReader(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + return new StructReader(readers, struct, idToConstant); } private static class StringReader implements ValueReader { @@ -258,8 +257,8 @@ static class StructReader extends ValueReaders.StructReader { private final int numFields; protected StructReader( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - super(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); this.numFields = readers.size(); }