Skip to content

Commit

Permalink
Merge pull request #24154 from ballard26/iceberg-microbench
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Nov 18, 2024
2 parents b083827 + ca9b97f commit 1102318
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 11 deletions.
12 changes: 6 additions & 6 deletions src/v/datalake/record_schema_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct schema_translating_visitor {
ppsr::schema_id id;

ss::future<checked<type_and_buf, type_resolver::errc>>
operator()(const ppsr::avro_schema_definition& avro_def) {
operator()(ppsr::avro_schema_definition&& avro_def) {
const auto& avro_schema = avro_def();
try {
auto result = datalake::type_to_iceberg(avro_schema.root());
Expand Down Expand Up @@ -70,7 +70,7 @@ struct schema_translating_visitor {
}
}
ss::future<checked<type_and_buf, type_resolver::errc>>
operator()(const ppsr::protobuf_schema_definition& pb_def) {
operator()(ppsr::protobuf_schema_definition&& pb_def) {
const google::protobuf::Descriptor* d;
proto_offsets_message_data offsets;
try {
Expand All @@ -96,7 +96,7 @@ struct schema_translating_visitor {
auto type = type_to_iceberg(*d).value();
co_return type_and_buf{
.type = resolved_type{
.schema = *d,
.schema = wrapped_protobuf_descriptor { *d, std::move(pb_def) },
.id = {.schema_id = id, .protobuf_offsets = std::move(offsets.protobuf_offsets)},
.type = std::move(type),
.type_name = d->name(),
Expand All @@ -112,7 +112,7 @@ struct schema_translating_visitor {
}
}
ss::future<checked<type_and_buf, type_resolver::errc>>
operator()(const ppsr::json_schema_definition&) {
operator()(ppsr::json_schema_definition&&) {
co_return type_and_buf::make_raw_binary(std::move(buf_no_id));
}
};
Expand Down Expand Up @@ -182,8 +182,8 @@ record_schema_resolver::resolve_buf_type(iobuf b) const {
co_return errc::registry_error;
}
auto resolved_schema = std::move(schema_fut.get());
co_return co_await resolved_schema.visit(
schema_translating_visitor{std::move(buf_no_id), schema_id});
co_return co_await std::move(resolved_schema)
.visit(schema_translating_visitor{std::move(buf_no_id), schema_id});
}

} // namespace datalake
11 changes: 8 additions & 3 deletions src/v/datalake/record_schema_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ class Descriptor;

namespace datalake {

struct wrapped_protobuf_descriptor {
std::reference_wrapper<const google::protobuf::Descriptor> descriptor;
// `descriptor` references an object owned by `schema_def`.
pandaproxy::schema_registry::protobuf_schema_definition schema_def;
};

// Represents an object that can be converted into an Iceberg schema.
// NOTE: these aren't exactly just the schemas from the registry: Protobuf
// schemas are FileDescriptors in the registry rather than Descriptors, and
// require additional information to get the Descriptors.
using resolved_schema = std::variant<
std::reference_wrapper<const google::protobuf::Descriptor>,
avro::ValidSchema>;
using resolved_schema
= std::variant<wrapped_protobuf_descriptor, avro::ValidSchema>;

struct resolved_type {
// The resolved schema that corresponds to the type.
Expand Down
5 changes: 3 additions & 2 deletions src/v/datalake/record_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "base/vlog.h"
#include "datalake/conversion_outcome.h"
#include "datalake/logger.h"
#include "datalake/record_schema_resolver.h"
#include "datalake/table_definition.h"
#include "datalake/values_avro.h"
#include "datalake/values_protobuf.h"
Expand All @@ -32,8 +33,8 @@ struct value_translating_visitor {
const iceberg::field_type& type;

ss::future<optional_value_outcome>
operator()(const google::protobuf::Descriptor& d) {
return deserialize_protobuf(std::move(parsable_buf), d);
operator()(const wrapped_protobuf_descriptor& d) {
return deserialize_protobuf(std::move(parsable_buf), d.descriptor);
}
ss::future<optional_value_outcome> operator()(const avro::ValidSchema& s) {
auto value = co_await deserialize_avro(std::move(parsable_buf), s);
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ compatibility_result check_compatible(
const protobuf_schema_definition& writer,
verbose is_verbose = verbose::no);

///\brief Returns a reference to the `Descriptor` at the offset specified by
///`fields`.
/// Note that the returned reference to is an object owned by
/// `protobuf_schema_definition` and therefore should only be used while that
/// object is alive.
::result<
std::reference_wrapper<const google::protobuf::Descriptor>,
kafka::error_code>
Expand Down

0 comments on commit 1102318

Please sign in to comment.