Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data: Support reading default values from generic Avro readers #6004

4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ acceptedBreaks:
justification: "Removing deprecations for 1.2.0"
"1.2.0":
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.types.Types.NestedField"
new: "class org.apache.iceberg.types.Types.NestedField"
justification: "Serialization across versions is not supported"
- code: "java.field.constantValueChanged"
old: "field org.apache.iceberg.actions.RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT"
new: "field org.apache.iceberg.actions.RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT"
Expand Down
14 changes: 12 additions & 2 deletions api/src/main/java/org/apache/iceberg/types/PruneColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,21 @@ public Type struct(Types.StructType struct, List<Type> fieldResults) {
if (field.isOptional()) {
selectedFields.add(
Types.NestedField.optional(
field.fieldId(), field.name(), projectedType, field.doc()));
field.fieldId(),
field.name(),
projectedType,
field.doc(),
field.initialDefault(),
field.writeDefault()));
} else {
selectedFields.add(
Types.NestedField.required(
field.fieldId(), field.name(), projectedType, field.doc()));
field.fieldId(),
field.name(),
projectedType,
field.doc(),
field.initialDefault(),
field.writeDefault()));
}
}
}
Expand Down
64 changes: 54 additions & 10 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,43 +413,75 @@ public int hashCode() {

public static class NestedField implements Serializable {
public static NestedField optional(int id, String name, Type type) {
return new NestedField(true, id, name, type, null);
return new NestedField(true, id, name, type, null, null, null);
}

public static NestedField optional(int id, String name, Type type, String doc) {
return new NestedField(true, id, name, type, doc);
return new NestedField(true, id, name, type, doc, null, null);
}

public static NestedField optional(
Copy link
Contributor

@rdblue rdblue Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we talked about making these package-private until the implementation was complete so that it doesn't leak into the public API before the values are read correctly? Is there some reason why that wouldn't work?

Copy link
Contributor Author

@rzhang10 rzhang10 Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that I need to instantiate a NestedField with default value in the TestReadDefaultValues class, but this class lives in the iceberg-data module, which is not the same package as types.java in the API module, so package private doesn't work..

I can annotate this method as VisibleForTesting and/or Beta, do you think that works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the test class? We can't merge this unless these are not visible. Annotations don't change visibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to move the test class to another package since all the Avro dependencies will break in this case. I have updated the PR to make both optional and the NestedField constructor protected and inherited NestedField class in the test class. However, that has the risk of having to downgrade NestedFiled constructor to private in the future once this workaround is no longer needed.
Another option is to keep both as private, and use reflection in the test to access them as a temporary workaround.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept the constructor as private and the newly introduced NestedField APIs as public.

int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) {
return new NestedField(true, id, name, type, doc, initialDefault, writeDefault);
}

public static NestedField required(int id, String name, Type type) {
return new NestedField(false, id, name, type, null);
return new NestedField(false, id, name, type, null, null, null);
}

public static NestedField required(int id, String name, Type type, String doc) {
return new NestedField(false, id, name, type, doc);
return new NestedField(false, id, name, type, doc, null, null);
}

public static NestedField required(
int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) {
return new NestedField(false, id, name, type, doc, initialDefault, writeDefault);
}

public static NestedField of(int id, boolean isOptional, String name, Type type) {
return new NestedField(isOptional, id, name, type, null);
return new NestedField(isOptional, id, name, type, null, null, null);
}

public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) {
return new NestedField(isOptional, id, name, type, doc);
return new NestedField(isOptional, id, name, type, doc, null, null);
}

public static NestedField of(
int id,
boolean isOptional,
String name,
Type type,
String doc,
Object initialDefault,
Object writeDefault) {
return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault);
}

private final boolean isOptional;
private final int id;
private final String name;
private final Type type;
private final String doc;

private NestedField(boolean isOptional, int id, String name, Type type, String doc) {
private final Object initialDefault;
private final Object writeDefault;

public NestedField(
boolean isOptional,
int id,
String name,
Type type,
String doc,
Object initialDefault,
Object writeDefault) {
Preconditions.checkNotNull(name, "Name cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
this.isOptional = isOptional;
this.id = id;
this.name = name;
this.type = type;
this.doc = doc;
this.initialDefault = initialDefault;
this.writeDefault = writeDefault;
}

public boolean isOptional() {
Expand All @@ -460,7 +492,7 @@ public NestedField asOptional() {
if (isOptional) {
return this;
}
return new NestedField(true, id, name, type, doc);
return new NestedField(true, id, name, type, doc, initialDefault, writeDefault);
}

public boolean isRequired() {
Expand All @@ -471,7 +503,11 @@ public NestedField asRequired() {
if (!isOptional) {
return this;
}
return new NestedField(false, id, name, type, doc);
return new NestedField(false, id, name, type, doc, initialDefault, writeDefault);
}

public NestedField withWriteDefault(Object newWriteDefault) {
return new NestedField(isOptional, id, name, type, doc, initialDefault, newWriteDefault);
}

public int fieldId() {
Expand All @@ -490,6 +526,14 @@ public String doc() {
return doc;
}

public Object initialDefault() {
return initialDefault;
}

public Object writeDefault() {
return writeDefault;
}

@Override
public String toString() {
return String.format("%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,17 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s

} else {
Preconditions.checkArgument(
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),
(field.isRequired() && field.initialDefault() != null)
|| field.isOptional()
|| MetadataColumns.metadataFieldIds().contains(field.fieldId()),
"Missing required field: %s",
field.name());
// If the field has an initial default value, do not project it in the read schema.
// The default will be added in {@link org.apache.iceberg.avro.ValueReader}.
if (field.initialDefault() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the case for a null default add a field while this one does not? That seems strange to me. I think this should either add a fake field in both cases or not.

I think the question hinges on why the fake field is added for the other case. I think it has something to do with the record schema needing to match the Iceberg schema exactly -- the record schema should not be different. If that's the case then it should be added for the non-null initial default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I originally thought using continue can just be an implementation trick, since although the output avro schema of the buildavroprojection process is lacking the field, it will be taken care of when they are being compared again in the ValueReader.

I think I can make the code logic more consistent by still creating a fake field with some other suffix like "_d" instead of "_r". The "_d" field will not be projected in the real file either, and the default value read is still taken care of by ValueReader. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of changing the name of the placeholder field that is added? Just use the same code that's already here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on the implementation of ValueReaders. In the case of defaults, ValueReaders takes care of filling in the default value. In the case of optionals, it is delegated to Avro to fill in the null values. I like the idea of doing it in the ValueReaders so it is consistent across all file formats, even when the file format does not support the semantic (e.g., ORC schema does not support default values).

continue;
}
rdblue marked this conversation as resolved.
Show resolved Hide resolved

// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField =
Expand Down
80 changes: 63 additions & 17 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.avro.Schema;
Expand All @@ -41,8 +42,10 @@
import org.apache.avro.util.Utf8;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;

Expand Down Expand Up @@ -566,8 +569,10 @@ public Map<K, V> read(Decoder decoder, Object reuse) throws IOException {

public abstract static class StructReader<S> implements ValueReader<S>, SupportsRowPosition {
private final ValueReader<?>[] readers;
private final int[] positions;
private final Object[] constants;
private final int[] constantPositions;
private final Object[] constantValues;
private final int[] defaultPositions;
private final Object[] defaultValues;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with the approach in #9366, then we should no longer need changes here, which was what originally made this difficult.

private int posField = -1;

protected StructReader(List<ValueReader<?>> readers, Schema schema) {
Expand All @@ -586,37 +591,68 @@ protected StructReader(List<ValueReader<?>> readers, Schema schema) {
}

if (isDeletedColumnPos == null) {
this.positions = new int[0];
this.constants = new Object[0];
this.constantPositions = new int[0];
this.constantValues = new Object[0];
} else {
this.positions = new int[] {isDeletedColumnPos};
this.constants = new Object[] {false};
this.constantPositions = new int[] {isDeletedColumnPos};
this.constantValues = new Object[] {false};
}
this.defaultPositions = new int[0];
this.defaultValues = new Object[0];
}

protected StructReader(
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
this(readers, struct, idToConstant, ImmutableMap.of());
}

protected StructReader(
List<ValueReader<?>> readers,
Types.StructType struct,
Map<Integer, ?> idToConstant,
Map<Integer, ?> idToDefault) {
this.readers = readers.toArray(new ValueReader[0]);

List<Types.NestedField> fields = struct.fields();
List<Integer> positionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> constantPositionsList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantValuesList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> defaultPositionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> defaultValuesList = Lists.newArrayListWithCapacity(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
if (idToConstant.containsKey(field.fieldId())) {
positionList.add(pos);
constantList.add(idToConstant.get(field.fieldId()));
constantPositionsList.add(pos);
constantValuesList.add(idToConstant.get(field.fieldId()));
} else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
} else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) {
positionList.add(pos);
constantList.add(false);
constantPositionsList.add(pos);
constantValuesList.add(false);
} else if (field.initialDefault() != null) {
if (idToDefault.containsKey(field.fieldId())) {
// Add a constant value for fields that have a default value.
// In the {@link #read()} method, this will be leveraged only if there is no
// corresponding
// reader.
defaultPositionList.add(pos);
defaultValuesList.add(idToDefault.get(field.fieldId()));
} else {
// Throw an exception if the map does not contain a default value for that field.
throw new UnsupportedOperationException(
"Default value not found for field: "
+ field.name()
+ " (field ID: "
+ field.fieldId()
+ ").");
}
}
}

this.positions = positionList.stream().mapToInt(Integer::intValue).toArray();
this.constants = constantList.toArray();
this.constantPositions = constantPositionsList.stream().mapToInt(Integer::intValue).toArray();
this.constantValues = constantValuesList.toArray();
this.defaultPositions = defaultPositionList.stream().mapToInt(Integer::intValue).toArray();
this.defaultValues = defaultValuesList.toArray();
}

@Override
Expand Down Expand Up @@ -654,10 +690,20 @@ public S read(Decoder decoder, Object reuse) throws IOException {
S struct = reuseOrCreate(reuse);

if (decoder instanceof ResolvingDecoder) {
// this may not set all of the fields. nulls are set by default.
Set<Integer> existingFieldPositionsSet = Sets.newHashSet();
// This may not set all of the fields. nulls are set by default.
for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) {
Object reusedValue = get(struct, field.pos());
set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue));
existingFieldPositionsSet.add(field.pos());
}

// Set default values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: missing empty newline between control flow block and the following statement.

for (int i = 0; i < defaultPositions.length; i += 1) {
// Set default values only if the field does not exist in the data.
if (!existingFieldPositionsSet.contains(defaultPositions[i])) {
set(struct, defaultPositions[i], defaultValues[i]);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary whitespace change.

} else {
Expand All @@ -667,8 +713,8 @@ public S read(Decoder decoder, Object reuse) throws IOException {
}
}

for (int i = 0; i < positions.length; i += 1) {
set(struct, positions[i], constants[i]);
for (int i = 0; i < constantPositions.length; i += 1) {
set(struct, constantPositions[i], constantValues[i]);
}

return struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.iceberg.data;

import java.nio.ByteBuffer;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;

public class IdentityPartitionConverters {
Expand All @@ -46,7 +48,9 @@ public static Object convertConstant(Type type, Object value) {
return DateTimeUtil.timestampFromMicros((Long) value);
}
case FIXED:
if (value instanceof GenericData.Fixed) {
if (value instanceof ByteBuffer) {
return ByteBuffers.toByteArray((ByteBuffer) value);
} else if (value instanceof GenericData.Fixed) {
return ((GenericData.Fixed) value).bytes();
}
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.DateTimeUtil;

Expand Down Expand Up @@ -106,7 +109,7 @@ private static class GenericRecordReader extends ValueReaders.StructReader<Recor

private GenericRecordReader(
List<ValueReader<?>> readers, StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
super(readers, struct, idToConstant, idToDefault(struct));
this.structType = struct;
}

Expand All @@ -128,5 +131,17 @@ protected Object get(Record struct, int pos) {
protected void set(Record struct, int pos, Object value) {
struct.set(pos, value);
}

private static Map<Integer, ?> idToDefault(StructType struct) {
Map<Integer, Object> result = Maps.newHashMap();
for (Types.NestedField field : struct.fields()) {
if (field.initialDefault() != null) {
result.put(
field.fieldId(),
IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault()));
}
}
return result;
}
}
}
Loading