-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Add Protobuf 'any' type support to Kafka connector #17394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1157e13
to
b38f960
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % cache auto-refresh behaviour
@@ -60,6 +68,8 @@ | |||
|
|||
public class ProtobufColumnDecoder | |||
{ | |||
private static final ObjectMapper mapper = JsonMapper.builder().configure(ORDER_MAP_ENTRIES_BY_KEYS, true).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the ordering is relevant please add a commmment why. copy the one from sorted
method in this class?
@@ -40,6 +41,8 @@ | |||
private int confluentSchemaRegistryClientCacheSize = 1000; | |||
private EmptyFieldStrategy emptyFieldStrategy = IGNORE; | |||
private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, SECONDS); | |||
private int confluentDescriptorCacheSize = 1000; | |||
private Duration confluentDescriptorCacheRefreshInterval = new Duration(5, MINUTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, IMO the cache shouldn't refresh automatically by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to remove a configurable cache and instead use a non-evictable cache with a max size of 1000.
cb8a552
to
b8e19cf
Compare
@mosabua Could you please review the documentation for this type support? Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs need some clarification and refinement. Great start and thanks for not ignoring the docs @adamjshook
@@ -88,6 +88,7 @@ Property name Description | |||
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not. | |||
``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_`` | |||
``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``. | |||
``kafka.protobuf-any-support-enabled`` True to enable support for encoding Protobuf ``any`` types to JSON, defaults to ``false``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
Enable support for encoding Protobuf any
types to JSON by setting to the property to true
, defaults to false
.
===================================== ======================================= | ||
|
||
any | ||
+++++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same length as title
|
||
Protobuf schemas containing `google.protobuf.Any <https://protobuf.dev/programming-guides/proto3/#any>`_ `` | ||
fields are mapped to a ``JSON`` field in Trino when | ||
``kafka.protobuf-any-support-enabled`` is set to ``true``. Any message types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads weird .. how about
Message types with an Any
field contain an arbitrary ...
b8e19cf
to
33d0809
Compare
@mosabua I've made updates to the documentation, please give it another review and let me know of any changes. Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last nit, but essentially the docs are good to go.
google.protobuf.Any any_message = 1; | ||
} | ||
|
||
The corresponding Trino column will be named ``any_message`` of type ``JSON`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The corresponding Trino column will be named ``any_message`` of type ``JSON`` | |
The corresponding Trino column is named ``any_message`` of type ``JSON`` |
33d0809
to
a75a191
Compare
a75a191
to
9c94e5e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % some docs comments
@@ -88,6 +88,8 @@ Property name Description | |||
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not. | |||
``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_`` | |||
``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``. | |||
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf any types to JSON by setting to the property to ``true``, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf any types to JSON by setting to the property to ``true``, | |
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf ``any`` types to ``JSON`` by setting to the property to ``true``, |
|
||
Message types with an `Any <https://protobuf.dev/programming-guides/proto3/#any>`_ | ||
field contain an arbitrary serialized message as bytes and a type URL to resolve | ||
that message's type, typically with a scheme of ``file://`` or ``http://``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are schemes other than file://
, http://
or https://
supported? If not let's word this more explicitly to say exactly what schemes are supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I've changed the docs to be more clear.
9c94e5e
to
30577b2
Compare
30577b2
to
47be527
Compare
47be527
to
29a2d3b
Compare
Rebased to resolve conflicts. Will merge after CI. |
Description
This adds support for the
any
Protobuf data type, converting the message type to JSON. The default implementation uses theProtobufUtils
mechanism for parsing theDescriptor
. There is a separate implementation that uses Confluent'sProtobufSchemaProvider
which is used instead when theConfluentModule
is enabled and uses caching when accessing the types.This PR needs user-facing documentation which will be added as another commit.
Additional context and related issues
The Protobuf
any
type contains two fields, onetype_url
field and another being any serialized Protobuf message. Thetype_url
is expected to be a URL that resolves to a serialized descriptor of the message. This change uses ajava.net.URL
of thetype_url
to open a stream and read the bytes of the descriptor which is then converted into aDescriptor
object. We then pass this to a JSON printer to serialize the message. Afterwards, we have to sort all of the keys, including nested keys, as the Trino JSON type expects all keys to be ordered.This requires users to ensure that the URL is resolvable from the Coordinator and Worker hosts, either by being a locally installed
file://
on each machine or a remotehttps://
service.Expands on #16836 which added
oneof
supportRelease notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text: