Skip to content

Commit

Permalink
Simplify some APIs and address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wmoustafa committed Apr 2, 2023
1 parent f5f4dbb commit 1b036d9
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 75 deletions.
16 changes: 10 additions & 6 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ acceptedBreaks:
old: "class org.apache.iceberg.PartitionKey"
new: "class org.apache.iceberg.PartitionKey"
justification: "Serialization across versions is not supported"
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.types.Types.NestedField"
new: "class org.apache.iceberg.types.Types.NestedField"
justification: "Serialization across versions is not supported"
- code: "java.class.removed"
old: "interface org.apache.iceberg.Rollback"
justification: "Deprecations for 1.0 release"
Expand All @@ -16,9 +20,15 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.StatisticsFile> org.apache.iceberg.Table::statisticsFiles()"
justification: "new API method"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)"
justification: "Adding table scan APIs to support scanning from refs"
- code: "java.method.removed"
old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::addedFiles()"
justification: "Deprecations for 1.0 release"
- code: "java.method.removed"
old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::deletedFiles()"
justification: "Deprecations for 1.0 release"
- code: "java.method.removed"
old: "method java.util.List<org.apache.iceberg.ManifestFile> org.apache.iceberg.Snapshot::allManifests()"
justification: "Deprecations for 1.0 release"
Expand All @@ -28,9 +38,6 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method java.util.List<org.apache.iceberg.ManifestFile> org.apache.iceberg.Snapshot::deleteManifests()"
justification: "Deprecations for 1.0 release"
- code: "java.method.removed"
old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::deletedFiles()"
justification: "Deprecations for 1.0 release"
- code: "java.method.removed"
old: "method org.apache.iceberg.OverwriteFiles org.apache.iceberg.OverwriteFiles::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)"
justification: "Deprecations for 1.0 release"
Expand All @@ -40,9 +47,6 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method org.apache.iceberg.RowDelta org.apache.iceberg.RowDelta::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)"
justification: "Deprecations for 1.0 release"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)"
justification: "Adding table scan APIs to support scanning from refs"
release-base-0.13.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public static NestedField optional(int id, String name, Type type, String doc) {
return new NestedField(true, id, name, type, doc, null, null);
}

public static NestedField optional(
protected static NestedField optional(
int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) {
return new NestedField(true, id, name, type, doc, initialDefault, writeDefault);
}
Expand Down Expand Up @@ -464,7 +464,7 @@ public static NestedField of(
private final Object initialDefault;
private final Object writeDefault;

private NestedField(
protected NestedField(
boolean isOptional,
int id,
String name,
Expand Down
27 changes: 19 additions & 8 deletions core/src/main/java/org/apache/iceberg/SingleValueParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static Object fromJson(Type type, JsonNode defaultValue) {
defaultLength);
byte[] fixedBytes =
BaseEncoding.base16().decode(defaultValue.textValue().toUpperCase(Locale.ROOT));
return ByteBuffer.wrap(fixedBytes);
return fixedBytes;
case BINARY:
Preconditions.checkArgument(
defaultValue.isTextual(), "Cannot parse default as a %s value: %s", type, defaultValue);
Expand Down Expand Up @@ -242,7 +242,7 @@ public static String toJson(Type type, Object defaultValue, boolean pretty) {
return JsonUtil.generate(gen -> toJson(type, defaultValue, gen), pretty);
}

@SuppressWarnings("checkstyle:MethodLength")
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:CyclomaticComplexity"})
public static void toJson(Type type, Object defaultValue, JsonGenerator generator)
throws IOException {
if (defaultValue == null) {
Expand Down Expand Up @@ -309,17 +309,28 @@ public static void toJson(Type type, Object defaultValue, JsonGenerator generato
generator.writeString(defaultValue.toString());
break;
case FIXED:
// Normally, FIXED is backed by a byte[], but it can also be backed by a ByteBuffer in the
// case of Fixed Literals. Ideally, Fixed Literals would be backed by a byte[], but that
// would make the APIs backwards incompatible.
// See {@link org.apache.iceberg.expressions.Literals.FixedLiteral}.
Preconditions.checkArgument(
defaultValue instanceof ByteBuffer, "Invalid default %s value: %s", type, defaultValue);
ByteBuffer byteBufferValue = (ByteBuffer) defaultValue;
defaultValue instanceof byte[] || defaultValue instanceof ByteBuffer,
"Invalid default %s value: %s",
type,
defaultValue);
byte[] byteArrayValue;
if (defaultValue instanceof ByteBuffer) {
byteArrayValue = ByteBuffers.toByteArray((ByteBuffer) defaultValue);
} else {
byteArrayValue = (byte[]) defaultValue;
}
int expectedLength = ((Types.FixedType) type).length();
Preconditions.checkArgument(
byteBufferValue.remaining() == expectedLength,
byteArrayValue.length == expectedLength,
"Invalid default %s value, incorrect length: %s",
type,
byteBufferValue.remaining());
generator.writeString(
BaseEncoding.base16().encode(ByteBuffers.toByteArray(byteBufferValue)));
byteArrayValue.length);
generator.writeString(BaseEncoding.base16().encode(byteArrayValue));
break;
case BINARY:
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
(field.isRequired() && field.initialDefault() != null)
|| field.isOptional()
|| MetadataColumns.metadataFieldIds().contains(field.fieldId()),
"Missing required field that doesn't have a default value: %s",
"Missing required field: %s",
field.name());
// If the field from Iceberg schema has initial default value, we just pass and don't
// project it to the avro file read schema with the generated _r field name,
// the default value will be directly read from the Iceberg layer
// If the field has an initial default value, do not project it in the read schema.
// The default will be added in {@link org.apache.iceberg.avro.ValueReader}.
if (field.initialDefault() != null) {
continue;
}

// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField =
Expand Down
63 changes: 39 additions & 24 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,10 @@ public Map<K, V> read(Decoder decoder, Object reuse) throws IOException {

public abstract static class StructReader<S> implements ValueReader<S>, SupportsRowPosition {
private final ValueReader<?>[] readers;
private final int[] positions;
private final Object[] constants;
private final int[] constantValuesPositions;
private final Object[] constantValues;
private final int[] defaultValuesPositions;
private final Object[] defaultValues;
private int posField = -1;

protected StructReader(List<ValueReader<?>> readers, Schema schema) {
Expand All @@ -586,43 +588,51 @@ protected StructReader(List<ValueReader<?>> readers, Schema schema) {
}

if (isDeletedColumnPos == null) {
this.positions = new int[0];
this.constants = new Object[0];
this.constantValuesPositions = new int[0];
this.constantValues = new Object[0];
} else {
this.positions = new int[] {isDeletedColumnPos};
this.constants = new Object[] {false};
this.constantValuesPositions = new int[] {isDeletedColumnPos};
this.constantValues = new Object[] {false};
}
this.defaultValuesPositions = new int[0];
this.defaultValues = new Object[0];
}

protected StructReader(
List<ValueReader<?>> readers,
Types.StructType struct,
Schema record,
Map<Integer, ?> idToConstant) {
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
this.readers = readers.toArray(new ValueReader[0]);

List<Types.NestedField> fields = struct.fields();
List<Integer> positionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> constantValuesPositionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantValuesList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> defaultValuesList = Lists.newArrayListWithCapacity(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
if (idToConstant.containsKey(field.fieldId())) {
positionList.add(pos);
constantList.add(idToConstant.get(field.fieldId()));
constantValuesPositionList.add(pos);
constantValuesList.add(idToConstant.get(field.fieldId()));
} else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
} else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) {
positionList.add(pos);
constantList.add(false);
} else if (record.getField(field.name()) == null && field.initialDefault() != null) {
positionList.add(pos);
constantList.add(field.initialDefault());
constantValuesPositionList.add(pos);
constantValuesList.add(false);
} else if (field.initialDefault() != null) {
// Add a constant value for fields that have a default value.
// In the {@link #read()} method, this will be leveraged only if there is no corresponding
// reader.
defaultValuesPositionList.add(pos);
defaultValuesList.add(field.initialDefault());
}
}

this.positions = positionList.stream().mapToInt(Integer::intValue).toArray();
this.constants = constantList.toArray();
this.constantValuesPositions =
constantValuesPositionList.stream().mapToInt(Integer::intValue).toArray();
this.constantValues = constantValuesList.toArray();
this.defaultValuesPositions =
defaultValuesPositionList.stream().mapToInt(Integer::intValue).toArray();
this.defaultValues = defaultValuesList.toArray();
}

@Override
Expand Down Expand Up @@ -659,22 +669,27 @@ public ValueReader<?> reader(int pos) {
public S read(Decoder decoder, Object reuse) throws IOException {
S struct = reuseOrCreate(reuse);

// Set the default values first. Setting default values first allows them to be overridden
// once the data is read from the file if the corresponding field is present in the file.
for (int i = 0; i < defaultValuesPositions.length; i += 1) {
set(struct, defaultValuesPositions[i], defaultValues[i]);
}

if (decoder instanceof ResolvingDecoder) {
// this may not set all of the fields. nulls are set by default.
for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) {
Object reusedValue = get(struct, field.pos());
set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue));
}

} else {
for (int i = 0; i < readers.length; i += 1) {
Object reusedValue = get(struct, i);
set(struct, i, readers[i].read(decoder, reusedValue));
}
}

for (int i = 0; i < positions.length; i += 1) {
set(struct, positions[i], constants[i]);
for (int i = 0; i < constantValuesPositions.length; i += 1) {
set(struct, constantValuesPositions[i], constantValues[i]);
}

return struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private GenericRecordReader(
StructType struct,
Schema record,
Map<Integer, ?> idToConstant) {
super(readers, struct, record, idToConstant);
super(readers, struct, idToConstant);
this.structType = struct;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testMissingRequiredFields() {
AssertHelpers.assertThrows(
"Missing required field in nameMapping",
IllegalArgumentException.class,
"Missing required field that doesn't have a default value: x",
"Missing required field: x",
// In this case, pruneColumns result is an empty record
() -> writeAndRead(writeSchema, readSchema, record, nameMapping));
}
Expand Down
12 changes: 0 additions & 12 deletions data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
*/
package org.apache.iceberg.data;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.assertj.core.api.Assertions;
import org.junit.Assert;

Expand Down Expand Up @@ -90,16 +88,6 @@ private static void assertEquals(Type type, Object expected, Object actual) {
"Primitive value should be equal to expected for type " + type, expected, actual);
break;
case FIXED:
// For fixed type, Iceberg actually represent value as Bytebuffer,
// but since RandomGenericData generates bytearray data for fixed type
// for it to be written to Avro, we add a conversion here to make the
// equality comparison consistent using bytearray
if (expected instanceof ByteBuffer) {
expected = ByteBuffers.toByteArray((ByteBuffer) expected);
}
if (actual instanceof ByteBuffer) {
actual = ByteBuffers.toByteArray((ByteBuffer) actual);
}
Assertions.assertThat(expected)
.as("Expected should be a byte[]")
.isInstanceOf(byte[].class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,16 @@ public void writeAndValidate() throws IOException {
Schema readerSchema =
new Schema(
required(999, "col1", Types.IntegerType.get()),
optional(1000, "col2", type, null, defaultValue, defaultValue));
NestedFieldWithInitialDefault.optionalWithDefault(
1000, "col2", type, null, defaultValue, defaultValue));

List<Record> generatedRecords = RandomGenericData.generate(writerSchema, 100, 0L);
List<Record> expected = Lists.newArrayList();
for (Record record : generatedRecords) {
Record r = GenericRecord.create(readerSchema);
r.set(0, record.get(0));
r.set(1, defaultValue);
expected.add(r);
Record expectedRecord = GenericRecord.create(readerSchema);
expectedRecord.set(0, record.get(0));
expectedRecord.set(1, defaultValue);
expected.add(expectedRecord);
}

File testFile = temp.newFile();
Expand Down Expand Up @@ -164,4 +165,34 @@ public void writeAndValidate() throws IOException {
}
}
}

// TODO: This class should be removed once NestedField.optional() that takes initialDefault and
// writeDefault becomes public. It is intentionally package private to avoid exposing it in the
// public API as of now.
static class NestedFieldWithInitialDefault extends Types.NestedField {
private final Object initialDefault;

NestedFieldWithInitialDefault(
boolean isOptional,
int fieldId,
String name,
Type type,
String doc,
Object initialDefault,
Object writeDefault) {
super(isOptional, fieldId, name, type, doc, initialDefault, writeDefault);
this.initialDefault = initialDefault;
}

static Types.NestedField optionalWithDefault(
int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) {
return new NestedFieldWithInitialDefault(
true, id, name, type, doc, initialDefault, writeDefault);
}

@Override
public Object initialDefault() {
return initialDefault;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private ReadBuilder(Map<Integer, ?> idToConstant) {
@Override
public ValueReader<?> record(
Types.StructType expected, Schema record, List<String> names, List<ValueReader<?>> fields) {
return FlinkValueReaders.struct(fields, expected.asStructType(), record, idToConstant);
return FlinkValueReaders.struct(fields, expected.asStructType(), idToConstant);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.util.Utf8;
import org.apache.flink.table.data.ArrayData;
Expand Down Expand Up @@ -88,8 +87,8 @@ static ValueReader<MapData> map(ValueReader<?> keyReader, ValueReader<?> valueRe
}

static ValueReader<RowData> struct(
List<ValueReader<?>> readers, Types.StructType struct, Schema record, Map<Integer, ?> idToConstant) {
return new StructReader(readers, struct, record, idToConstant);
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
return new StructReader(readers, struct, idToConstant);
}

private static class StringReader implements ValueReader<StringData> {
Expand Down Expand Up @@ -287,8 +286,8 @@ private static class StructReader extends ValueReaders.StructReader<RowData> {
private final int numFields;

private StructReader(
List<ValueReader<?>> readers, Types.StructType struct, Schema record, Map<Integer, ?> idToConstant) {
super(readers, struct, record, idToConstant);
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
this.numFields = readers.size();
}

Expand Down
Loading

0 comments on commit 1b036d9

Please sign in to comment.