-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data: Support reading default values from generic Avro readers #6004
Changes from all commits
5b49ea7
6a3540c
cb5e31a
251e138
b20b765
70ae07b
a77c857
c9f9c27
0629c8a
7ccf868
057366b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,9 +103,17 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s | |
|
||
} else { | ||
Preconditions.checkArgument( | ||
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), | ||
(field.isRequired() && field.initialDefault() != null) | ||
|| field.isOptional() | ||
|| MetadataColumns.metadataFieldIds().contains(field.fieldId()), | ||
"Missing required field: %s", | ||
field.name()); | ||
// If the field has an initial default value, do not project it in the read schema. | ||
// The default will be added in {@link org.apache.iceberg.avro.ValueReader}. | ||
if (field.initialDefault() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does the case for a I think the question hinges on why the fake field is added for the other case. I think it has something to do with the record schema needing to match the Iceberg schema exactly -- the record schema should not be different. If that's the case then it should be added for the non-null initial default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, I originally thought using I think I can make the code logic more consistent by still creating a fake field with some other suffix like "_d" instead of "_r". The "_d" field will not be projected in the real file either, and the default value read is still taken care of by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of changing the name of the placeholder field that is added? Just use the same code that's already here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it depends on the implementation of |
||
continue; | ||
} | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Create a field that will be defaulted to null. We assign a unique suffix to the field | ||
// to make sure that even if records in the file have the field it is not projected. | ||
Schema.Field newField = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.function.Supplier; | ||
import org.apache.avro.Schema; | ||
|
@@ -41,8 +42,10 @@ | |
import org.apache.avro.util.Utf8; | ||
import org.apache.iceberg.MetadataColumns; | ||
import org.apache.iceberg.common.DynConstructors; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
import org.apache.iceberg.types.Types; | ||
import org.apache.iceberg.util.UUIDUtil; | ||
|
||
|
@@ -566,8 +569,10 @@ public Map<K, V> read(Decoder decoder, Object reuse) throws IOException { | |
|
||
public abstract static class StructReader<S> implements ValueReader<S>, SupportsRowPosition { | ||
private final ValueReader<?>[] readers; | ||
private final int[] positions; | ||
private final Object[] constants; | ||
private final int[] constantPositions; | ||
private final Object[] constantValues; | ||
private final int[] defaultPositions; | ||
private final Object[] defaultValues; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we go with the approach in #9366, then we should no longer need changes here, which was what originally made this difficult. |
||
private int posField = -1; | ||
|
||
protected StructReader(List<ValueReader<?>> readers, Schema schema) { | ||
|
@@ -586,37 +591,68 @@ protected StructReader(List<ValueReader<?>> readers, Schema schema) { | |
} | ||
|
||
if (isDeletedColumnPos == null) { | ||
this.positions = new int[0]; | ||
this.constants = new Object[0]; | ||
this.constantPositions = new int[0]; | ||
this.constantValues = new Object[0]; | ||
} else { | ||
this.positions = new int[] {isDeletedColumnPos}; | ||
this.constants = new Object[] {false}; | ||
this.constantPositions = new int[] {isDeletedColumnPos}; | ||
this.constantValues = new Object[] {false}; | ||
} | ||
this.defaultPositions = new int[0]; | ||
this.defaultValues = new Object[0]; | ||
} | ||
|
||
protected StructReader( | ||
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) { | ||
this(readers, struct, idToConstant, ImmutableMap.of()); | ||
} | ||
|
||
protected StructReader( | ||
List<ValueReader<?>> readers, | ||
Types.StructType struct, | ||
Map<Integer, ?> idToConstant, | ||
Map<Integer, ?> idToDefault) { | ||
this.readers = readers.toArray(new ValueReader[0]); | ||
|
||
List<Types.NestedField> fields = struct.fields(); | ||
List<Integer> positionList = Lists.newArrayListWithCapacity(fields.size()); | ||
List<Object> constantList = Lists.newArrayListWithCapacity(fields.size()); | ||
List<Integer> constantPositionsList = Lists.newArrayListWithCapacity(fields.size()); | ||
List<Object> constantValuesList = Lists.newArrayListWithCapacity(fields.size()); | ||
List<Integer> defaultPositionList = Lists.newArrayListWithCapacity(fields.size()); | ||
List<Object> defaultValuesList = Lists.newArrayListWithCapacity(fields.size()); | ||
for (int pos = 0; pos < fields.size(); pos += 1) { | ||
Types.NestedField field = fields.get(pos); | ||
if (idToConstant.containsKey(field.fieldId())) { | ||
positionList.add(pos); | ||
constantList.add(idToConstant.get(field.fieldId())); | ||
constantPositionsList.add(pos); | ||
constantValuesList.add(idToConstant.get(field.fieldId())); | ||
} else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) { | ||
// track where the _pos field is located for setRowPositionSupplier | ||
this.posField = pos; | ||
} else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { | ||
positionList.add(pos); | ||
constantList.add(false); | ||
constantPositionsList.add(pos); | ||
constantValuesList.add(false); | ||
} else if (field.initialDefault() != null) { | ||
if (idToDefault.containsKey(field.fieldId())) { | ||
// Add a constant value for fields that have a default value. | ||
// In the {@link #read()} method, this will be leveraged only if there is no | ||
// corresponding | ||
// reader. | ||
defaultPositionList.add(pos); | ||
defaultValuesList.add(idToDefault.get(field.fieldId())); | ||
} else { | ||
// Throw an exception if the map does not contain a default value for that field. | ||
throw new UnsupportedOperationException( | ||
"Default value not found for field: " | ||
+ field.name() | ||
+ " (field ID: " | ||
+ field.fieldId() | ||
+ ")."); | ||
} | ||
} | ||
} | ||
|
||
this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); | ||
this.constants = constantList.toArray(); | ||
this.constantPositions = constantPositionsList.stream().mapToInt(Integer::intValue).toArray(); | ||
this.constantValues = constantValuesList.toArray(); | ||
this.defaultPositions = defaultPositionList.stream().mapToInt(Integer::intValue).toArray(); | ||
this.defaultValues = defaultValuesList.toArray(); | ||
} | ||
|
||
@Override | ||
|
@@ -654,10 +690,20 @@ public S read(Decoder decoder, Object reuse) throws IOException { | |
S struct = reuseOrCreate(reuse); | ||
|
||
if (decoder instanceof ResolvingDecoder) { | ||
// this may not set all of the fields. nulls are set by default. | ||
Set<Integer> existingFieldPositionsSet = Sets.newHashSet(); | ||
// This may not set all of the fields. nulls are set by default. | ||
for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { | ||
Object reusedValue = get(struct, field.pos()); | ||
set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); | ||
existingFieldPositionsSet.add(field.pos()); | ||
} | ||
|
||
// Set default values | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style: missing empty newline between control flow block and the following statement. |
||
for (int i = 0; i < defaultPositions.length; i += 1) { | ||
// Set default values only if the field does not exist in the data. | ||
if (!existingFieldPositionsSet.contains(defaultPositions[i])) { | ||
set(struct, defaultPositions[i], defaultValues[i]); | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: unnecessary whitespace change. |
||
} else { | ||
|
@@ -667,8 +713,8 @@ public S read(Decoder decoder, Object reuse) throws IOException { | |
} | ||
} | ||
|
||
for (int i = 0; i < positions.length; i += 1) { | ||
set(struct, positions[i], constants[i]); | ||
for (int i = 0; i < constantPositions.length; i += 1) { | ||
set(struct, constantPositions[i], constantValues[i]); | ||
} | ||
|
||
return struct; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we talked about making these package-private until the implementation was complete so that it doesn't leak into the public API before the values are read correctly? Is there some reason why that wouldn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that I need to instantiate a
NestedField
with default value in theTestReadDefaultValues
class, but this class lives in the iceberg-data module, which is not the same package astypes.java
in the API module, so package private doesn't work..I can annotate this method as
VisibleForTesting
and/orBeta
, do you think that works?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the test class? We can't merge this unless these are not visible. Annotations don't change visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard to move the test class to another package since all the Avro dependencies will break in this case. I have updated the PR to make both
optional
and theNestedField
constructor protected and inheritedNestedField
class in the test class. However, that has the risk of having to downgradeNestedFiled
constructor to private in the future once this workaround is no longer needed.Another option is to keep both as private, and use reflection in the test to access them as a temporary workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kept the constructor as private and the newly introduced
NestedField
APIs as public.