From 360f614fd788a8b3ef285bfd8ce80cec0e21f166 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Sun, 17 Nov 2024 21:07:38 -0500 Subject: [PATCH 1/2] datalake: manage protobuf descriptor lifetimes correctly --- src/v/datalake/record_schema_resolver.cc | 12 ++++++------ src/v/datalake/record_schema_resolver.h | 11 ++++++++--- src/v/datalake/record_translator.cc | 5 +++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/v/datalake/record_schema_resolver.cc b/src/v/datalake/record_schema_resolver.cc index f760bafee8e83..3134737b91f3b 100644 --- a/src/v/datalake/record_schema_resolver.cc +++ b/src/v/datalake/record_schema_resolver.cc @@ -39,7 +39,7 @@ struct schema_translating_visitor { ppsr::schema_id id; ss::future> - operator()(const ppsr::avro_schema_definition& avro_def) { + operator()(ppsr::avro_schema_definition&& avro_def) { const auto& avro_schema = avro_def(); try { auto result = datalake::type_to_iceberg(avro_schema.root()); @@ -70,7 +70,7 @@ struct schema_translating_visitor { } } ss::future> - operator()(const ppsr::protobuf_schema_definition& pb_def) { + operator()(ppsr::protobuf_schema_definition&& pb_def) { const google::protobuf::Descriptor* d; proto_offsets_message_data offsets; try { @@ -96,7 +96,7 @@ struct schema_translating_visitor { auto type = type_to_iceberg(*d).value(); co_return type_and_buf{ .type = resolved_type{ - .schema = *d, + .schema = wrapped_protobuf_descriptor { *d, std::move(pb_def) }, .id = {.schema_id = id, .protobuf_offsets = std::move(offsets.protobuf_offsets)}, .type = std::move(type), .type_name = d->name(), @@ -112,7 +112,7 @@ struct schema_translating_visitor { } } ss::future> - operator()(const ppsr::json_schema_definition&) { + operator()(ppsr::json_schema_definition&&) { co_return type_and_buf::make_raw_binary(std::move(buf_no_id)); } }; @@ -182,8 +182,8 @@ record_schema_resolver::resolve_buf_type(iobuf b) const { co_return errc::registry_error; } auto resolved_schema = std::move(schema_fut.get()); - co_return co_await resolved_schema.visit( - schema_translating_visitor{std::move(buf_no_id), schema_id}); + co_return co_await std::move(resolved_schema) + .visit(schema_translating_visitor{std::move(buf_no_id), schema_id}); } } // namespace datalake diff --git a/src/v/datalake/record_schema_resolver.h b/src/v/datalake/record_schema_resolver.h index c1a83a6369e04..ce201d6ee2666 100644 --- a/src/v/datalake/record_schema_resolver.h +++ b/src/v/datalake/record_schema_resolver.h @@ -26,13 +26,18 @@ class Descriptor; namespace datalake { +struct wrapped_protobuf_descriptor { + std::reference_wrapper descriptor; + // `descriptor` references an object owned by `schema_def`. + pandaproxy::schema_registry::protobuf_schema_definition schema_def; +}; + // Represents an object that can be converted into an Iceberg schema. // NOTE: these aren't exactly just the schemas from the registry: Protobuf // schemas are FileDescriptors in the registry rather than Descriptors, and // require additional information to get the Descriptors. -using resolved_schema = std::variant< - std::reference_wrapper, - avro::ValidSchema>; +using resolved_schema + = std::variant; struct resolved_type { // The resolved schema that corresponds to the type. diff --git a/src/v/datalake/record_translator.cc b/src/v/datalake/record_translator.cc index f8be6728a9f6e..fbb6cbada9a4a 100644 --- a/src/v/datalake/record_translator.cc +++ b/src/v/datalake/record_translator.cc @@ -12,6 +12,7 @@ #include "base/vlog.h" #include "datalake/conversion_outcome.h" #include "datalake/logger.h" +#include "datalake/record_schema_resolver.h" #include "datalake/table_definition.h" #include "datalake/values_avro.h" #include "datalake/values_protobuf.h" @@ -32,8 +33,8 @@ struct value_translating_visitor { const iceberg::field_type& type; ss::future - operator()(const google::protobuf::Descriptor& d) { - return deserialize_protobuf(std::move(parsable_buf), d); + operator()(const wrapped_protobuf_descriptor& d) { + return deserialize_protobuf(std::move(parsable_buf), d.descriptor); } ss::future operator()(const avro::ValidSchema& s) { auto value = co_await deserialize_avro(std::move(parsable_buf), s); From ca9b97ff0c2c92a704a271702a2b6d7d80d48bbe Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Mon, 18 Nov 2024 01:02:24 -0500 Subject: [PATCH 2/2] pandaproxy: add documentation to descriptor function --- src/v/pandaproxy/schema_registry/protobuf.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/protobuf.h b/src/v/pandaproxy/schema_registry/protobuf.h index 763bfcac8c7a3..77a115d706eb3 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.h +++ b/src/v/pandaproxy/schema_registry/protobuf.h @@ -34,6 +34,11 @@ compatibility_result check_compatible( const protobuf_schema_definition& writer, verbose is_verbose = verbose::no); +///\brief Returns a reference to the `Descriptor` at the offset specified by +///`fields`. +/// Note that the returned reference to is an object owned by +/// `protobuf_schema_definition` and therefore should only be used while that +/// object is alive. ::result< std::reference_wrapper, kafka::error_code>