Skip to content

Kafka Connect: Enable Parquet variant shredding for generic Record writes#16370

Open
soumilshah1995 wants to merge 3 commits into
apache:mainfrom
soumilshah1995:kafka-connect-shredding
Open

Kafka Connect: Enable Parquet variant shredding for generic Record writes#16370
soumilshah1995 wants to merge 3 commits into
apache:mainfrom
soumilshah1995:kafka-connect-shredding

Conversation

@soumilshah1995
Copy link
Copy Markdown

Summary

Kafka Connect uses Iceberg’s generic Record model with a Void engine schema. Parquet variant shredding was ineffective on that path because the generic ParquetFormatModel did not use a variant shredding analyzer / row copier, and RecordVariantShreddingAnalyzer could not resolve VARIANT columns (engine resolveColumnIndex is a dead end for Void).

This PR wires RecordVariantShreddingAnalyzer and Record::copy into GenericFormatModels and analyzes VARIANT columns by Iceberg Schema#columns() order so buffered inference and shredded Parquet columns work for Connect.

Changes

  • GenericFormatModels: register ParquetFormatModel with RecordVariantShreddingAnalyzer + Record::copy.
  • RecordVariantShreddingAnalyzer: implement analyzeVariantColumns using positional indices aligned with Record#get.

Config (Connect)

Table write properties (e.g. via iceberg.tables.write-props):

  • write.parquet.shred-variants=true
  • write.parquet.variant-inference-buffer-size=<rows>

Test plan

  • ./gradlew :iceberg-data:check :iceberg-kafka-connect:iceberg-kafka-connect:check (or CI green).
  • Connect sink writing VARIANT with write.parquet.shred-variants=true; inspect Parquet for typed_value paths / higher physical column count vs false.
  • Regression: Connect append with shredding disabled still succeeds.

@github-actions github-actions Bot added the data label May 16, 2026
@soumilshah1995 soumilshah1995 force-pushed the kafka-connect-shredding branch 2 times, most recently from 8ce1a9d to ccb48f1 Compare May 16, 2026 22:00
Register ParquetFormatModel with RecordVariantShreddingAnalyzer and Record::copy, and analyze VARIANT columns using Iceberg schema column order so shredding works with Void engine schemas (Kafka Connect).
@nssalian
Copy link
Copy Markdown
Contributor

Thanks for the contribution @soumilshah1995. Please fix the tests and get CI in a good shape.

soumilshah199500 and others added 2 commits May 19, 2026 14:26
Cover Iceberg column-order resolution and FormatModelRegistry Parquet
shredding round-trip for generic Record writes (Kafka Connect path).

Co-authored-by: Cursor <cursoragent@cursor.com>
Rename setup() to before() per Iceberg test naming convention.

Co-authored-by: Cursor <cursoragent@cursor.com>
@soumilshah1995
Copy link
Copy Markdown
Author

@nssalian
Thank you very much for taking the time to review my changes. This is my first contribution, and I truly appreciate your patience and guidance. I have addressed the changes you mentioned. Please let me know if there is anything else you would like me to update or improve.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants