-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: Implement Variant writers #12323
Conversation
360f531
to
086a16c
Compare
@@ -85,6 +85,16 @@ public static ShreddedObject object(VariantMetadata metadata) { | |||
return new ShreddedObject(metadata); | |||
} | |||
|
|||
public static ShreddedObject object(VariantObject object) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used to create a shredded object from an existing object when writing. It uses the object's metadata.
This avoids exposing VariantObject.metadata
because metadata is carried by Variant
instead of values.
@@ -62,7 +71,7 @@ protected ParquetValueWriter<?> timestampWriter(ColumnDescriptor desc, boolean i | |||
} | |||
} | |||
|
|||
private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> { | |||
private class WriteBuilder extends TypeWithSchemaVisitor<ParquetValueWriter<?>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to detect a variant type, this needs to use the original Iceberg schema. When Parquet exposes the VARIANT
logical type annotation, we can update this to no longer require the schema.
private String name = "table"; | ||
private WriteSupport<?> writeSupport = null; | ||
private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null; | ||
private BiFunction<Schema, MessageType, ParquetValueWriter<?>> createWriterFunc = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is supports passing the schema to the write builder function.
@@ -71,6 +71,10 @@ public static UnboxedWriter<Short> shorts(ColumnDescriptor desc) { | |||
return new ShortWriter(desc); | |||
} | |||
|
|||
public static <T> ParquetValueWriter<T> unboxed(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to expose this to get float
and double
working. Those writers currently require a metrics builder that expects a non-null field ID.
private final VariantMetadata metadata; | ||
private final VariantValue value; | ||
|
||
VariantData(VariantMetadata metadata, VariantValue value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was originally in #12304, but I moved it here after reverting changes to variant classes in that PR. This uses it to create objects that are passed to writers and created by readers.
67c2bab
to
f4296c3
Compare
Rebased after moving variants to API in #12374. |
f4296c3
to
f3e3ccc
Compare
package org.apache.iceberg.variants; | ||
|
||
import java.util.List; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the same as variant reader PR, we are not including array, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right.
core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public ParquetVariantVisitor<ParquetValueWriter<?>> variantVisitor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reduce the scope to protected for now. It doesn't seem to be accessed outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the Parquet visitor contract now. It needs to be public because that class is public.
@Override | ||
public ParquetValueWriter<?> value( | ||
GroupType value, ParquetValueWriter<?> valueWriter, ParquetValueWriter<?> typedWriter) { | ||
int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this to valueDefinitionLevel to be consistent.
} | ||
|
||
static ParquetValueWriter<VariantValue> primitive( | ||
ParquetValueWriter<?> writer, PhysicalType... types) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We accept varargs because of BOOLEAN_TRUE or BOOLEAN_FALSE and we don't expect multiple types?
Can we add a check here to make sure only one type is passed other than boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be used for other cases in the future. I don't see a reason to limit it right now.
@@ -56,6 +58,19 @@ public class TypeToMessageType { | |||
LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS); | |||
private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = | |||
LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.MICROS); | |||
private static final String METADATA = "metadata"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we are defining the constants in multiple places. We don't want to be in API but does it make sense in core Variants.java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure yet, so I've made them all private or package-private so we can look at where they should live later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -160,9 +161,10 @@ public static class WriteBuilder implements InternalData.WriteBuilder { | |||
private final Map<String, String> metadata = Maps.newLinkedHashMap(); | |||
private final Map<String, String> config = Maps.newLinkedHashMap(); | |||
private Schema schema = null; | |||
private BiFunction<Integer, String, Type> variantShreddingFunc = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel less and less comfortable about using functional references like this in public interfaces. This really makes maintainability much harder because we're not documenting what this represents. If we're going have a reference like this, then we should just make an wrapper interface (e.g. VariantShreddingFunction
) that documents the parameters/return and allows for better navigation of usage.
This is particularly prevalent in parquet and avro, but it even here we have three different functional references that we're passing around. If it were internal, I'd be fine, but these are public apis so we should do better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I think this should have been a private constructor and this should only be available through the ParquetSchemaUtil
method that documents how it is used. I'll update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the config method to document this and explain what is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use VariantShreddingFunction
.
dc141c8
to
4bcf388
Compare
Thanks for the reviews, @danielcweeks and @aihuaxu! |
This PR implements Variant writers for Parquet based on a Parquet schema passed into the writer builder. It works basically the same as #12139.