-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Spark: Add Variant read support for Spark Iceberg tables #13219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Add Variant read support for Spark Iceberg tables #13219
Conversation
aa48ff0
to
51ee01d
Compare
@aokolnychyi, @szehon-ho Can you help to check if it's the right direction? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was chatting with @danielcweeks and he brought up a good point that one of the implications of only releasing writing without shredding is that in the future when that is added, older readers wouldn't be able to read those datasets.
It may be worth trying to add the support upfront so we avoid situations where it's like "Spark4-iceberg 1.10 has the limitation that it is not able to read shredded datasets" for example. Though, that would be a considerable amount of work we need to think through how to do properly (the whole write path for shredded columns is a bit unclear, probably requiring some buffering/resetting based on the records to even figure out what schema to write with)
parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java
Outdated
Show resolved
Hide resolved
@aihuaxu I think it's important that we have a path forward for writing shredded columns before we introduce this. If we release a version that doesn't support reading shredded columns, it will be incompatible with future writers that produce shredded data. I also think we want to produce shredded values initially so that all readers accommodate shredding to being with. I don't think it's safe to do this in isolation. |
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java
Outdated
Show resolved
Hide resolved
Thanks @danielcweeks and @amogh-jahagirdar for the suggestion. That makes sense. Initially I thought I can break the changes to get feedback earlier. Let me incorporate shredding as well. |
51ee01d
to
9efae80
Compare
@aihuaxu, I caught up with @danielcweeks about this yesterday and I think his concern was that we need to support reading shredded values. It would be nice to be able to write them as well, but I think as long as this can read them (and we have a test to validate it) then we should be able to move forward with this. Thanks for your patience on this while I was out at conferences! |
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
Outdated
Show resolved
Hide resolved
9efae80
to
9993c35
Compare
9993c35
to
e8e6736
Compare
I have added a test case to read from shredded variant. I didn't add multiple test cases like TestVariantReaders but just added one. We will use ParquetVariantReaders to read and convert to VariantVal so the logic should have been covered by the existing TestVariantReaders. Shredded writer is not included yet. |
cd61881
to
78c5962
Compare
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkVariants.java
Show resolved
Hide resolved
Types.VariantType icebergVariantType = Types.VariantType.get(); | ||
DataType sparkVariantType = SparkSchemaUtil.convert(icebergVariantType); | ||
|
||
assertThat(sparkVariantType).isEqualTo(VariantType$.MODULE$); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be instanceof VariantType
right? Instances are equivalent so when we are accepting an object we typically use the instanceof
check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's even further to make sure it's returning the singleton of VariantType
. Would this be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there can be instances of VariantType
other than VariantType$.MODULE$
then I think it is better not to be overly restrictive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the implementation, it can only be VariantType$.MODULE$
in TypeToSparkType.java. It should be fine to check against the instance. That means it will not be other instance. Let me know if I misunderstand here.
@Override
public DataType variant(Types.VariantType variant) {
return VariantType$.MODULE$;
}
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkVariants.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkVariants.java
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkVariants.java
Show resolved
Hide resolved
551e941
to
58dfa24
Compare
@aihuaxu, it looks like the test failures are only checkstyle. Can you update this to fix them? |
This is missing an annotation and tests are failing, but this should be ready when those are fixed. Thanks, @aihuaxu! |
Sorry about that. Let me fix that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stepped through this locally, all the changes look good to me @aihuaxu !
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkVariants.java
Outdated
Show resolved
Hide resolved
@@ -98,10 +100,10 @@ private Dataset<Row> createDataset(List<GenericData.Record> records, Schema sche | |||
} | |||
|
|||
// verify that the dataframe matches | |||
assertThat(rows).hasSameSizeAs(records); | |||
Iterator<GenericData.Record> recordIter = records.iterator(); | |||
assertThat(rows.size()).isEqualTo(records.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is generally better to use assertThat(rows).hasSameSizeAs(records);
as that will show you the content of rows/records when the assertion ever fails. @aihuaxu was there a particular reason why this check was changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra I checked the usage and assertThat(rows).hasSameSizeAs(records);
does show the content when the number of records doesn't match compare to isEqualTo(), helping for debug. Let me change that.
This PR is to add the support for Spark to read Variant data against Iceberg tables. Basically when reading the Variant data (unshredded or shredded), Spark VariantReader reads an Iceberg
Variant
and converts to SparkVariantVal
. The Iceberg VariantReader handles reading shredded/unshredded IcebergVariant
. Currently VariantWriter handles writing unshredded IcebergVariant
only.