Skip to content

Kafka Connect: Add variant type support#15498

Closed
alexkot1394 wants to merge 4 commits into
apache:mainfrom
alexkot1394:kafka-connect-variant-support
Closed

Kafka Connect: Add variant type support#15498
alexkot1394 wants to merge 4 commits into
apache:mainfrom
alexkot1394:kafka-connect-variant-support

Conversation

@alexkot1394
Copy link
Copy Markdown

Summary

Adds support for the VARIANT data type in the Kafka Connect sink connector.

Background

When attempting to write records with variant fields to Iceberg tables via the Kafka Connect sink, the connector would throw:

java.lang.UnsupportedOperationException: Unsupported type: VARIANT
    at org.apache.iceberg.connect.data.RecordConverter.convertValue(RecordConverter.java:146)

This prevented users from using the Kafka Connect sink with Databricks tables or any Iceberg tables containing variant columns.

Changes

  • Added convertVariant() method to RecordConverter that handles conversion from Kafka Connect data types to Iceberg Variant
  • Supports conversion from:
    • JSON strings
    • Maps (converted to JSON objects)
    • Lists (converted to JSON arrays)
    • Kafka Connect Structs
    • Primitives (numbers, booleans)
    • Already-serialized variants (ByteBuffer, byte[])
  • Added test coverage in TestRecordConverter.testVariantConversion()

Implementation Details

The variant conversion wraps incoming data as JSON and creates an Iceberg Variant object with:

  • Empty metadata (for simplicity)
  • The JSON-serialized value

This approach is consistent with how other complex types are handled in the connector.

Testing

  • Added unit test testVariantConversion() that validates conversion of:
    • JSON strings
    • Complex nested objects (Maps with Lists)
    • Primitive values
  • All existing tests pass
  • Manually tested with Databricks Unity Catalog tables containing variant columns

Related Issues

Closes #15443

This adds support for the VARIANT data type in the Kafka Connect sink
connector's RecordConverter. Previously, attempting to write variant
fields resulted in UnsupportedOperationException.

Changes:
- Added convertVariant() method to handle conversion from Kafka Connect
  data types to Iceberg Variant
- Supports JSON strings, Maps, Lists, Structs, and primitives
- Added test coverage for variant field conversion

Resolves: apache#15443
Wrap Variant.from() calls in try-catch blocks to handle cases where
ByteBuffer/byte[] may not contain valid serialized variants. If
deserialization fails, fall back to JSON conversion instead of
throwing an exception.

This makes the connector more robust when handling binary data that
may not be properly formatted variant data.
Extract ByteBuffer/byte[] deserialization logic into a separate
tryDeserializeVariant() method to reduce cyclomatic complexity
from 13 to under the allowed limit of 12.
}

// Try to deserialize if it's a ByteBuffer or byte[]
Variant deserialized = tryDeserializeVariant(value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a safe way to handle binary data coming through. We should assume binary data is just bytes. If we "try" to process a bytebuffer we may affect the internal state and not all implementations may support mark/reset behavior. It appears that we're also dropping it if we can't deserialize it, which isn't good.

jsonString = new String(data, StandardCharsets.UTF_8);
} else if (value instanceof Number || value instanceof Boolean) {
// For primitives, convert to JSON representation
jsonString = value.toString();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to treat all primitives this way. We should handle each of the primitive types appropriately and convert to the correct variant primitive type.

@danielcweeks
Copy link
Copy Markdown
Contributor

Hey @alexkot1394 you might also want to confer with the author of #15283 as they're trying to accomplish the same thing.

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Apr 11, 2026
@github-actions
Copy link
Copy Markdown

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Apr 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Variant (and possibly other v3 types) in KafkaConnect

2 participants