Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data: Support reading default values from generic Avro readers #6004

Conversation

rzhang10
Copy link
Contributor

@rzhang10 rzhang10 commented Oct 17, 2022

This pr implements the support for generic Avro readers in iceberg-data module to read default values, which gets demonstrated via data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java.

The PR is based on the API changes in #4732

Subsequent PRs will mimic this approach to implement the same feature for (Spark/Flink) x (Avro/ORC/Parquet) readers.

@rdblue can you please take a look?

return new NestedField(true, id, name, type, doc, null, null);
}

public static NestedField optional(
Copy link
Contributor

@rdblue rdblue Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we talked about making these package-private until the implementation was complete so that it doesn't leak into the public API before the values are read correctly? Is there some reason why that wouldn't work?

Copy link
Contributor Author

@rzhang10 rzhang10 Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that I need to instantiate a NestedField with default value in the TestReadDefaultValues class, but this class lives in the iceberg-data module, which is not the same package as types.java in the API module, so package private doesn't work..

I can annotate this method as VisibleForTesting and/or Beta, do you think that works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the test class? We can't merge this unless these are not visible. Annotations don't change visibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to move the test class to another package since all the Avro dependencies will break in this case. I have updated the PR to make both optional and the NestedField constructor protected and inherited NestedField class in the test class. However, that has the risk of having to downgrade NestedFiled constructor to private in the future once this workaround is no longer needed.
Another option is to keep both as private, and use reflection in the test to access them as a temporary workaround.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept the constructor as private and the newly introduced NestedField APIs as public.

field.name());
// If the field from Iceberg schema has initial default value, we just pass and don't
Copy link
Contributor

@rdblue rdblue Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: In Iceberg, don't use personal pronouns like "I" or "we" in documentation or comments because it makes documentation harder to understand (who is "we"?) and longer. Instead, be direct:

If the field has an initial default value, do not project it in the read schema. The default will be added in the Iceberg ValueReader.

// If the field from Iceberg schema has initial default value, we just pass and don't
// project it to the avro file read schema with the generated _r field name,
// the default value will be directly read from the Iceberg layer
if (field.initialDefault() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the case for a null default add a field while this one does not? That seems strange to me. I think this should either add a fake field in both cases or not.

I think the question hinges on why the fake field is added for the other case. I think it has something to do with the record schema needing to match the Iceberg schema exactly -- the record schema should not be different. If that's the case then it should be added for the non-null initial default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I originally thought using continue can just be an implementation trick, since although the output avro schema of the buildavroprojection process is lacking the field, it will be taken care of when they are being compared again in the ValueReader.

I think I can make the code logic more consistent by still creating a fake field with some other suffix like "_d" instead of "_r". The "_d" field will not be projected in the real file either, and the default value read is still taken care of by ValueReader. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of changing the name of the placeholder field that is added? Just use the same code that's already here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on the implementation of ValueReaders. In the case of defaults, ValueReaders takes care of filling in the default value. In the case of optionals, it is delegated to Avro to fill in the null values. I like the idea of doing it in the ValueReaders so it is consistent across all file formats, even when the file format does not support the semantic (e.g., ORC schema does not support default values).

@@ -612,6 +615,9 @@ protected StructReader(
} else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) {
positionList.add(pos);
constantList.add(false);
} else if (record.getField(field.name()) == null && field.initialDefault() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason for leaving out the field in BuildAvroProjection was to make this check work. But I'm pretty sure that the Avro Schema needs to match the Iceberg Schema. So this will probably need to have some other way of detecting that the field will not be read from the field because it was missing from the file schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I suggested the above way of creating a "_d" field which can be a marker, or I can use a custom avro property, something like "<HAS_DEFAULT -> true>" to signify it, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing information through field names is error prone because you don't control the names of real columns.

Instead, what about adding a metadata key to the Avro schema field that signals this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest patch, record is no longer required in this method. The default value is set here similar using a mechanism similar to positions and constants and then it is used in the read() method. Removing dependency on record simplified this API and a few others as Schema record is no longer required in the signature (similar to before this patch).

@@ -205,7 +205,7 @@ public void testMissingRequiredFields() {
AssertHelpers.assertThrows(
"Missing required field in nameMapping",
IllegalArgumentException.class,
"Missing required field: x",
"Missing required field that doesn't have a default value: x",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"with no default value"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "Missing required field" is adequate. Based on where this error is placed, Missing field with no default value could be an ambiguous message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just saying that it should be "with no default value" rather than saying "doesn't have a default value". We try to be as direct as possible.

Copy link
Contributor

@wmoustafa wmoustafa May 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying Missing required field with no default could give the impression that the fix is by adding a default value (also depending on how one could read it, it could mean the fix is by introducing both the field and the default value); however, in fact the fix is either by adding the field or adding a default value. Hence, if we leave it at Missing required field, it could be adequate. We could also say Missing required field, or missing default value.

expected = ByteBuffers.toByteArray((ByteBuffer) expected);
}
if (actual instanceof ByteBuffer) {
actual = ByteBuffers.toByteArray((ByteBuffer) actual);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspicious. Are you sure that there is a mismatch in the readers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in Type.java, fixed type is associated with ByteBuffe:

    FIXED(ByteBuffer.class),

So theoretically, fixed type data should always be BtyeBuffer type instead of byte array.
And here the problem is actually the RandomGenericData generated the byte[] data for fixed type (I tried to change that one but it resulted in more tests failures because there are some existing avro writers depending on it, and Avro can only write fixed type data represented by byte[]).

So I instead added the conversion in this assertion helper class to convert the ByteBuffer back to byte[] for compatible comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the above conversion, the FixedType case in my TestReadDefaultValues will fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Iceberg generic in-memory representation, byte[] is correct. The reader should be producing byte[] and not ByteBuffer. The Iceberg type's parameter is for Iceberg internals, not the generic representation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest patch, this check is removed, and SingleValueParser moved to byte[]. However, it seems that Literals still uses ByteBuffer for FIXED type, so the discrepancy showed up again in SingleValueParser, where it needs to handle both byte[] and ByteBuffer inputs. There is a more detailed comment on this in SingleValueParser.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also to clarify where the dependency on byte[] is coming from, it actually comes from DataTestHelpers. We can see here that the FIXED case fails the assertion if the data type is not byte[]. Hence, there is a conflict between DataTestHelpers which uses byte[] and Literals which uses ByteBuffer. Such a gap is bridged in SingleValueParser#toJson in the current PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wmoustafa what is the failure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep SingleValueParser and DataTestHelpers without any change, the error is:

org.apache.iceberg.data.avro.TestReadDefaultValues > writeAndValidate FAILED
    java.lang.AssertionError: [Expected should be a byte[]] 
    Expecting actual:
      java.nio.HeapByteBuffer[pos=0 lim=2 cap=2]
    to be an instance of:
      byte[]
    but was instance of:
      java.nio.HeapByteBuffer
        at org.apache.iceberg.data.DataTestHelpers.assertEquals(DataTestHelpers.java:93)
        at org.apache.iceberg.data.DataTestHelpers.assertEquals(DataTestHelpers.java:39)
        at org.apache.iceberg.data.avro.TestReadDefaultValues.writeAndValidate(TestReadDefaultValues.java:164)

Copy link
Contributor

@rdblue rdblue May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think this does indicate a bug.

You're reading into Iceberg generics, so the values should all match the classes created by Iceberg generics. Those are not the same as the internal classes used by Iceberg because they are intended to be friendlier. For example, Iceberg users days from the unix epoch to represent a date, but generics will return Java's LocalDate.

Here's a diff that fixes your tests by using the correct types for the expected values:

diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
index 4cb4126315..c19d75f000 100644
--- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
+++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iceberg.data;
 
+import java.nio.ByteBuffer;
 import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.DateTimeUtil;
 
 public class IdentityPartitionConverters {
@@ -46,7 +48,9 @@ public class IdentityPartitionConverters {
           return DateTimeUtil.timestampFromMicros((Long) value);
         }
       case FIXED:
-        if (value instanceof GenericData.Fixed) {
+        if (value instanceof ByteBuffer) {
+          return ByteBuffers.toByteArray((ByteBuffer) value);
+        } else if (value instanceof GenericData.Fixed) {
           return ((GenericData.Fixed) value).bytes();
         }
         return value;
diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
index 0f98d2e571..0fb50566bf 100644
--- a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
+++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
 import org.apache.iceberg.data.DataTestHelpers;
 import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IdentityPartitionConverters;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppender;
@@ -133,7 +134,7 @@ public class TestReadDefaultValues {
       for (Record record : generatedRecords) {
         Record expectedRecord = GenericRecord.create(readerSchema);
         expectedRecord.set(0, record.get(0));
-        expectedRecord.set(1, defaultValue);
+        expectedRecord.set(1, IdentityPartitionConverters.convertConstant(type, defaultValue));
         expected.add(expectedRecord);
       }

The tests still fail, but now they show the real problem: default values are not being converted to match the object model. For partition constants, that is done in GenericReader:

    Map<Integer, ?> partition =
        PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);

We will need to do something similar for default values. I think what makes the most sense is to add a conversion function to StructReader that needs to be implemented by sub-classes. That can call the existing conversion functions, like IdentityPartitionConverters.convertConstant or similar for Spark.

Copy link
Contributor

@wmoustafa wmoustafa May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is some deep analysis with 3 levels of conversion to get it right :) I have directly called IdentityPartitionConverters.convertConstant from StructReader constructor. Ideally, IdentityPartitionConverters should be renamed in a future release since it is no longer specific to partitions.

@@ -586,37 +588,51 @@ protected StructReader(List<ValueReader<?>> readers, Schema schema) {
}

if (isDeletedColumnPos == null) {
this.positions = new int[0];
this.constants = new Object[0];
this.constantValuesPositions = new int[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ValuesPositions" is a bit awkward with multiple plural parts. I think it is also longer than needed because the variables would make sense as constantPositions and constantValues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used constantPositions, constantValues, defaultPositions, defaultValues for consistency.

- code: "java.method.removed"
old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::addedFiles()"
justification: "Deprecations for 1.0 release"
- code: "java.method.removed"
old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::deletedFiles()"
justification: "Deprecations for 1.0 release"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert the changes that moved these around?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were moved around due to the API change in NestedFields. It should go away once we fix the access modifiers for optional and the constructor. Access modifiers were changed to accommodate keeping the new APIs private, but we found there was no good solution for that, so we will just go with making them new API public (as it is the case in the end state). This will take care of keeping this file intact as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this only contains

    - code: "java.class.defaultSerializationChanged"
      old: "class org.apache.iceberg.types.Types.NestedField"
      new: "class org.apache.iceberg.types.Types.NestedField"
      justification: "Serialization across versions is not supported"

I think this should be expected as we added a new API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds fine.

List<ValueReader<?>> readers,
StructType struct,
Schema record,
Map<Integer, ?> idToConstant) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this change isn't needed, nor is the update to add Schema to createStructReader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

for (Record record : generatedRecords) {
Record expectedRecord = GenericRecord.create(readerSchema);
expectedRecord.set(0, record.get(0));
expectedRecord.set(1, defaultValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem with fixed might be introduced here. Iceberg generics will not produce the same value as the internal representation. If you expect ByteBuffer here, the generics are going to produce ByteBuffer.

You could stop setting the reader func to DataReader::create below, but then the GenericRecord instances would not be produced. What I'd do here is have a convert method to convert in this test all of the types that don't match between the two, like timestamps, dates, and fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed per above discussion.

};

@Test
public void writeAndValidate() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also want to write a file with explicit null values to validate that if the column is present we won't insert the initial default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one more test case to test this.

@wmoustafa wmoustafa force-pushed the iceberg_data_generic_avro_read_default_value branch from 1b036d9 to 70ae07b Compare April 28, 2023 06:35
@wmoustafa
Copy link
Contributor

@rdblue Latest commit reverts the change in SingleValueParser (as requested in order to test) so it i supposed to break.

List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> constantPositionsList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantValuesList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be defaultPositionList

for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) {
Object reusedValue = get(struct, field.pos());
set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue));
existingFieldPositionsSet.add(field.pos());
}
// Set default values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: missing empty newline between control flow block and the following statement.

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary whitespace change.

// reader.
defaultValuesPositionList.add(pos);
defaultValuesList.add(
IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the problem for Iceberg generics, but this conversion is not valid for other in-memory data models, like Spark or Flink. Those also extend this class.

We either need to pass in the conversion method or pass in a map of default constants like the idToConstant map above. I think I'd prefer to pass in an idToDefault map that is used in the same way and produced from the schema.

Using a map will also help solve the problem of structs with nested defaults.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in order to make sure that we are not producing incorrect results for the other object models, we will need to fail if a default value is needed by this class, but is unavailable.

import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestReadDefaultValues {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When Spark and Flink support are added, we will need to have a similar test there.

}

@Test
public void testDefaultValueNotApplied() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@rdblue
Copy link
Contributor

rdblue commented Sep 4, 2023

@wmoustafa, looks like there are test failures. Can you take a look?

old: "method void org.apache.iceberg.avro.ValueReaders.StructReader<S>::<init>(java.util.List<org.apache.iceberg.avro.ValueReader<?>>,\
\ org.apache.iceberg.types.Types.StructType, java.util.Map<java.lang.Integer,\
\ ?>)"
new: "method void org.apache.iceberg.avro.ValueReaders.StructReader<S>::<init>(java.util.List<org.apache.iceberg.avro.ValueReader<?>>,\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to break compatibility. Can you add a constructor that defaults the idToDefault map to ImmutableMap.of()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

@wmoustafa
Copy link
Contributor

@wmoustafa, looks like there are test failures. Can you take a look?

Fixed.

@rdblue
Copy link
Contributor

rdblue commented Dec 22, 2023

@wmoustafa, I just opened a refactor that should make this a lot easier to get in: #9366

When I went to thoroughly review this, I had to do a lot of research into how buildAvroProjection works and how this changes it. It is really hard to make changes like this given how complex that projection process is. To solve the problem, I refactored to pull all of the constant and default value logic into the read builder. now it's simpler and can easily be extended with default values. The builder also now uses the expected Iceberg schema directly so that we can now just add cases to add readers that supply default values as constants, like how the code works for partition constants.

private final int[] constantPositions;
private final Object[] constantValues;
private final int[] defaultPositions;
private final Object[] defaultValues;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with the approach in #9366, then we should no longer need changes here, which was what originally made this difficult.

@rdblue
Copy link
Contributor

rdblue commented Jan 1, 2024

Now that #9366 is in, I think we should perform a similar refactor for the Iceberg generics and add default support that way. It should be much simpler!

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 23, 2024
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants