Skip to content

Add Protobuf 'any' type support to Kafka connector #17394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 3, 2023

Conversation

adamjshook
Copy link
Member

@adamjshook adamjshook commented May 8, 2023

Description

This adds support for the any Protobuf data type, converting the message type to JSON. The default implementation uses the ProtobufUtils mechanism for parsing the Descriptor. There is a separate implementation that uses Confluent's ProtobufSchemaProvider which is used instead when the ConfluentModule is enabled and uses caching when accessing the types.

This PR needs user-facing documentation which will be added as another commit.

Additional context and related issues

The Protobuf any type contains two fields, one type_url field and another being any serialized Protobuf message. The type_url is expected to be a URL that resolves to a serialized descriptor of the message. This change uses a java.net.URL of the type_url to open a stream and read the bytes of the descriptor which is then converted into a Descriptor object. We then pass this to a JSON printer to serialize the message. Afterwards, we have to sort all of the keys, including nested keys, as the Trino JSON type expects all keys to be ordered.

This requires users to ensure that the URL is resolvable from the Coordinator and Worker hosts, either by being a locally installed file:// on each machine or a remote https:// service.

Expands on #16836 which added oneof support

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# Kafka Connector
* Added support for reading Protobuf messages containing the `Any` Protobuf type. This is disabled by default and can be enabled by setting `kafka.protobuf-any-support-enabled` to `true`. Please see docs for more details.

@cla-bot cla-bot bot added the cla-signed label May 8, 2023
@adamjshook adamjshook requested review from hashhar and wendigo May 8, 2023 20:04
@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch from 1157e13 to b38f960 Compare May 9, 2023 23:04
Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % cache auto-refresh behaviour

@@ -60,6 +68,8 @@

public class ProtobufColumnDecoder
{
private static final ObjectMapper mapper = JsonMapper.builder().configure(ORDER_MAP_ENTRIES_BY_KEYS, true).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the ordering is relevant please add a commmment why. copy the one from sorted method in this class?

@@ -40,6 +41,8 @@
private int confluentSchemaRegistryClientCacheSize = 1000;
private EmptyFieldStrategy emptyFieldStrategy = IGNORE;
private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, SECONDS);
private int confluentDescriptorCacheSize = 1000;
private Duration confluentDescriptorCacheRefreshInterval = new Duration(5, MINUTES);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, IMO the cache shouldn't refresh automatically by default

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to remove a configurable cache and instead use a non-evictable cache with a max size of 1000.

@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch 2 times, most recently from cb8a552 to b8e19cf Compare May 25, 2023 12:39
@adamjshook adamjshook added the needs-docs This pull request requires changes to the documentation label May 25, 2023
@adamjshook adamjshook requested a review from hashhar May 25, 2023 12:40
@adamjshook
Copy link
Member Author

@mosabua Could you please review the documentation for this type support? Thank you!

@adamjshook adamjshook requested a review from mosabua May 25, 2023 12:42
@github-actions github-actions bot added the docs label May 25, 2023
Copy link
Member

@mosabua mosabua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs need some clarification and refinement. Great start and thanks for not ignoring the docs @adamjshook

@@ -88,6 +88,7 @@ Property name Description
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not.
``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_``
``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``.
``kafka.protobuf-any-support-enabled`` True to enable support for encoding Protobuf ``any`` types to JSON, defaults to ``false``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Enable support for encoding Protobuf any types to JSON by setting to the property to true, defaults to false.

===================================== =======================================

any
+++++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same length as title


Protobuf schemas containing `google.protobuf.Any <https://protobuf.dev/programming-guides/proto3/#any>`_ ``
fields are mapped to a ``JSON`` field in Trino when
``kafka.protobuf-any-support-enabled`` is set to ``true``. Any message types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads weird .. how about

Message types with an Any field contain an arbitrary ...

@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch from b8e19cf to 33d0809 Compare June 2, 2023 16:31
@adamjshook adamjshook requested a review from mosabua June 2, 2023 18:02
@adamjshook
Copy link
Member Author

@mosabua I've made updates to the documentation, please give it another review and let me know of any changes. Thank you!

Copy link
Member

@mosabua mosabua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last nit, but essentially the docs are good to go.

google.protobuf.Any any_message = 1;
}

The corresponding Trino column will be named ``any_message`` of type ``JSON``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The corresponding Trino column will be named ``any_message`` of type ``JSON``
The corresponding Trino column is named ``any_message`` of type ``JSON``

@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch from 33d0809 to a75a191 Compare June 5, 2023 12:55
@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch from a75a191 to 9c94e5e Compare June 16, 2023 13:54
@adamjshook
Copy link
Member Author

@hashhar I've rebased against master to pick up the changes from #17858.

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some docs comments

@@ -88,6 +88,8 @@ Property name Description
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not.
``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_``
``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``.
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf any types to JSON by setting to the property to ``true``,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf any types to JSON by setting to the property to ``true``,
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf ``any`` types to ``JSON`` by setting to the property to ``true``,


Message types with an `Any <https://protobuf.dev/programming-guides/proto3/#any>`_
field contain an arbitrary serialized message as bytes and a type URL to resolve
that message's type, typically with a scheme of ``file://`` or ``http://``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are schemes other than file://, http:// or https:// supported? If not let's word this more explicitly to say exactly what schemes are supported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I've changed the docs to be more clear.

@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch from 9c94e5e to 30577b2 Compare June 26, 2023 15:22
@adamjshook adamjshook requested a review from hashhar June 26, 2023 15:22
@adamjshook adamjshook force-pushed the adamjshook/protobuf-any branch from 30577b2 to 47be527 Compare June 26, 2023 17:21
@hashhar hashhar force-pushed the adamjshook/protobuf-any branch from 47be527 to 29a2d3b Compare July 3, 2023 12:50
@hashhar
Copy link
Member

hashhar commented Jul 3, 2023

Rebased to resolve conflicts. Will merge after CI.

@hashhar hashhar merged commit ee47797 into trinodb:master Jul 3, 2023
@github-actions github-actions bot added this to the 421 milestone Jul 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed docs needs-docs This pull request requires changes to the documentation
Development

Successfully merging this pull request may close these issues.

3 participants