Skip to content

Spark: Add Variant read support for Spark Iceberg tables #13219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 29, 2025

Conversation

aihuaxu
Copy link
Contributor

@aihuaxu aihuaxu commented Jun 2, 2025

This PR is to add the support for Spark to read Variant data against Iceberg tables. Basically when reading the Variant data (unshredded or shredded), Spark VariantReader reads an Iceberg Variant and converts to Spark VariantVal. The Iceberg VariantReader handles reading shredded/unshredded Iceberg Variant. Currently VariantWriter handles writing unshredded Iceberg Variant only.

@aihuaxu
Copy link
Contributor Author

aihuaxu commented Jun 2, 2025

@aokolnychyi, @szehon-ho Can you help to check if it's the right direction? Thanks.

@amogh-jahagirdar amogh-jahagirdar self-requested a review June 2, 2025 19:20
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was chatting with @danielcweeks and he brought up a good point that one of the implications of only releasing writing without shredding is that in the future when that is added, older readers wouldn't be able to read those datasets.

It may be worth trying to add the support upfront so we avoid situations where it's like "Spark4-iceberg 1.10 has the limitation that it is not able to read shredded datasets" for example. Though, that would be a considerable amount of work we need to think through how to do properly (the whole write path for shredded columns is a bit unclear, probably requiring some buffering/resetting based on the records to even figure out what schema to write with)

@danielcweeks
Copy link
Contributor

@aihuaxu I think it's important that we have a path forward for writing shredded columns before we introduce this. If we release a version that doesn't support reading shredded columns, it will be incompatible with future writers that produce shredded data.

I also think we want to produce shredded values initially so that all readers accommodate shredding to being with. I don't think it's safe to do this in isolation.

@aihuaxu
Copy link
Contributor Author

aihuaxu commented Jun 4, 2025

@aihuaxu I think it's important that we have a path forward for writing shredded columns before we introduce this. If we release a version that doesn't support reading shredded columns, it will be incompatible with future writers that produce shredded data.

I also think we want to produce shredded values initially so that all readers accommodate shredding to being with. I don't think it's safe to do this in isolation.

Thanks @danielcweeks and @amogh-jahagirdar for the suggestion. That makes sense. Initially I thought I can break the changes to get feedback earlier. Let me incorporate shredding as well.

@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 51ee01d to 9efae80 Compare June 5, 2025 18:23
@rdblue
Copy link
Contributor

rdblue commented Jun 26, 2025

@aihuaxu, I caught up with @danielcweeks about this yesterday and I think his concern was that we need to support reading shredded values. It would be nice to be able to write them as well, but I think as long as this can read them (and we have a test to validate it) then we should be able to move forward with this. Thanks for your patience on this while I was out at conferences!

@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 9efae80 to 9993c35 Compare July 13, 2025 06:17
@aihuaxu aihuaxu changed the title Spark: Add basic Variant read/write support for Spark Iceberg tables without shredding Spark: Add Variant read support for Spark Iceberg tables Jul 13, 2025
@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 9993c35 to e8e6736 Compare July 14, 2025 18:47
@aihuaxu
Copy link
Contributor Author

aihuaxu commented Jul 14, 2025

@aihuaxu, I caught up with @danielcweeks about this yesterday and I think his concern was that we need to support reading shredded values. It would be nice to be able to write them as well, but I think as long as this can read them (and we have a test to validate it) then we should be able to move forward with this. Thanks for your patience on this while I was out at conferences!

I have added a test case to read from shredded variant. I didn't add multiple test cases like TestVariantReaders but just added one. We will use ParquetVariantReaders to read and convert to VariantVal so the logic should have been covered by the existing TestVariantReaders.

Shredded writer is not included yet.

@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from cd61881 to 78c5962 Compare July 25, 2025 22:51
Types.VariantType icebergVariantType = Types.VariantType.get();
DataType sparkVariantType = SparkSchemaUtil.convert(icebergVariantType);

assertThat(sparkVariantType).isEqualTo(VariantType$.MODULE$);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be instanceof VariantType right? Instances are equivalent so when we are accepting an object we typically use the instanceof check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's even further to make sure it's returning the singleton of VariantType. Would this be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there can be instances of VariantType other than VariantType$.MODULE$ then I think it is better not to be overly restrictive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the implementation, it can only be VariantType$.MODULE$ in TypeToSparkType.java. It should be fine to check against the instance. That means it will not be other instance. Let me know if I misunderstand here.

  @Override
  public DataType variant(Types.VariantType variant) {
    return VariantType$.MODULE$;
  }

@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 551e941 to 58dfa24 Compare July 26, 2025 06:14
@aihuaxu aihuaxu requested a review from rdblue July 26, 2025 06:16
@rdblue
Copy link
Contributor

rdblue commented Jul 28, 2025

@aihuaxu, it looks like the test failures are only checkstyle. Can you update this to fix them?

@rdblue
Copy link
Contributor

rdblue commented Jul 28, 2025

This is missing an annotation and tests are failing, but this should be ready when those are fixed. Thanks, @aihuaxu!

@aihuaxu
Copy link
Contributor Author

aihuaxu commented Jul 28, 2025

@aihuaxu, it looks like the test failures are only checkstyle. Can you update this to fix them?

Sorry about that. Let me fix that.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stepped through this locally, all the changes look good to me @aihuaxu !

@amogh-jahagirdar amogh-jahagirdar merged commit 5710569 into apache:main Jul 29, 2025
42 checks passed
@amogh-jahagirdar
Copy link
Contributor

Thanks @aihuaxu @rdblue !

@@ -98,10 +100,10 @@ private Dataset<Row> createDataset(List<GenericData.Record> records, Schema sche
}

// verify that the dataframe matches
assertThat(rows).hasSameSizeAs(records);
Iterator<GenericData.Record> recordIter = records.iterator();
assertThat(rows.size()).isEqualTo(records.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is generally better to use assertThat(rows).hasSameSizeAs(records); as that will show you the content of rows/records when the assertion ever fails. @aihuaxu was there a particular reason why this check was changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra I checked the usage and assertThat(rows).hasSameSizeAs(records); does show the content when the number of records doesn't match compare to isEqualTo(), helping for debug. Let me change that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants