Skip to content

Commit

Permalink
schema_registry/protobuf: Support ?normalize=true
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jan 9, 2025
1 parent 0ee8c73 commit 603b3d7
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 46 deletions.
1 change: 1 addition & 0 deletions src/v/pandaproxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ redpanda_cc_library(
"@abseil-cpp//absl/container:node_hash_map",
"@avro",
"@boost//:algorithm",
"@boost//:container",
"@boost//:graph",
"@boost//:lexical_cast",
"@boost//:math",
Expand Down
161 changes: 128 additions & 33 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pandaproxy/schema_registry/compatibility.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
#include "src/v/pandaproxy/schema_registry/protobuf/confluent/meta.pb.h"
#include "src/v/pandaproxy/schema_registry/protobuf/confluent/types/decimal.pb.h"
#include "src/v/pandaproxy/schema_registry/protobuf/google/type/calendar_period.pb.h"
Expand Down Expand Up @@ -46,11 +47,15 @@

#include <absl/container/flat_hash_set.h>
#include <boost/algorithm/string/trim.hpp>
#include <boost/container/flat_set.hpp>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <google/protobuf/any.pb.h>
#include <google/protobuf/api.pb.h>
#include <google/protobuf/compiler/parser.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/descriptor_database.h>
#include <google/protobuf/duration.pb.h>
#include <google/protobuf/empty.pb.h>
#include <google/protobuf/field_mask.pb.h>
Expand Down Expand Up @@ -488,6 +493,7 @@ struct protobuf_schema_definition::impl {
pb::DescriptorPool _dp;
const pb::FileDescriptor* fd{};
pb::FileDescriptorProto fdp{};
normalize is_normalized{normalize::no};
protobuf_renderer_v2 v2_renderer{protobuf_renderer_v2::no};

/**
Expand Down Expand Up @@ -549,7 +555,13 @@ struct protobuf_schema_definition::impl {
return canonical_schema_definition::raw_string{std::move(buf)};
}
iobuf_ostream osb;
render_proto(osb.ostream(), fdp, *fd);
if (is_normalized) {
pb::FileDescriptorProto tmp_fdp;
fd->CopyTo(&tmp_fdp);
render_proto(osb.ostream(), std::move(tmp_fdp), *fd);
} else {
render_proto(osb.ostream(), fdp, *fd);
}
return canonical_schema_definition::raw_string{std::move(osb).buf()};
}

Expand Down Expand Up @@ -590,6 +602,20 @@ struct protobuf_schema_definition::impl {
std::variant<Range, std::reference_wrapper<const Range>> _r;
};

template<
std::ranges::random_access_range Range,
typename Comp = std::ranges::less,
typename Proj = std::identity>
const range_proxy<Range> maybe_sorted(
const Range& range, Comp comp = Comp{}, Proj proj = Proj{}) const {
if (!is_normalized) {
return range_proxy<Range>{range};
}
std::decay_t<Range> copy = range;
std::ranges::sort(copy, comp, proj);
return range_proxy<Range>{std::move(copy)};
}

void render_field(
std::ostream& os,
pb::Edition edition,
Expand Down Expand Up @@ -664,9 +690,11 @@ struct protobuf_schema_definition::impl {
[&](const pb::FieldDescriptor* fd) -> std::string_view {
switch (fd->type()) {
case pb::FieldDescriptor::TYPE_MESSAGE:
return fd->message_type()->name();
return is_normalized ? fd->message_type()->full_name()
: fd->message_type()->name();
case pb::FieldDescriptor::TYPE_ENUM:
return fd->enum_type()->name();
return is_normalized ? fd->enum_type()->full_name()
: fd->enum_type()->name();
default:
return fd->type_name();
};
Expand Down Expand Up @@ -751,7 +779,13 @@ struct protobuf_schema_definition::impl {
},
field_options());

for (const auto& option : options.uninterpreted_option()) {
auto uninterpreted_options = maybe_sorted(
options.uninterpreted_option(),
std::less{},
[](const auto& option) {
return fmt::format("{}", fmt::join(option.name(), ""));
});
for (const auto& option : uninterpreted_options) {
maybe_print_seperator();
fmt::print(os, "{}", option);
}
Expand Down Expand Up @@ -811,39 +845,52 @@ struct protobuf_schema_definition::impl {
message.options().no_standard_descriptor_accessor());
}
}
for (const auto& option : message.options().uninterpreted_option()) {
auto uninterepreted_options = maybe_sorted(
message.options().uninterpreted_option(),
std::less{},
[](const auto& option) {
return fmt::format("{}", fmt::join(option.name(), ""));
});
for (const auto& option : uninterepreted_options) {
fmt::print(os, "{:{}}option {};\n", "", indent + 2, option);
}

for (const auto& value : message.reserved_range()) {
auto reserved_range = maybe_sorted(
message.reserved_range(),
std::less{},
&pb::DescriptorProto_ReservedRange::start);
for (const auto& value : reserved_range) {
fmt::print(os, "{:{}}reserved {}", "", indent + 2, value.start());
if (value.has_end() && value.end() != value.start() + 1) {
fmt::print(os, " to {}", value.end() - 1);
}
fmt::print(os, ";\n");
}
if (message.reserved_name_size() != 0) {
auto reserved_names = maybe_sorted(message.reserved_name());
if (!reserved_names.empty()) {
fmt::print(
os,
"{:{}}reserved \"{}\";\n",
"",
indent + 2,
fmt::join(message.reserved_name(), "\", \""));
fmt::join(reserved_names, "\", \""));
}
if (
message.reserved_range_size() != 0
|| message.reserved_name_size() != 0) {
if (!reserved_range.empty() || !reserved_names.empty()) {
fmt::print(os, "\n");
}

bool has_fields = false;
auto fields = maybe_sorted(
message.field(),
std::ranges::less{},
&pb::FieldDescriptorProto::number);

// Each oneof section needs to start with the lowest field number, which
// may be different to the order of oneof_decl in the message.
std::vector<int> oneofs;

// Render non oneof fields, and record correct order of oneof indices.
for (const auto& field : message.field()) {
for (const auto& field : fields) {
if (
field.has_oneof_index()
&& !(field.has_proto3_optional() && field.proto3_optional())) {
Expand Down Expand Up @@ -918,14 +965,18 @@ struct protobuf_schema_definition::impl {
const pb::EnumDescriptorProto& enum_proto,
int indent) const {
fmt::print(os, "{:{}}enum {} {{\n", "", indent, enum_proto.name());
for (const auto& value : enum_proto.reserved_range()) {
auto reserved_range = maybe_sorted(
enum_proto.reserved_range(),
std::less{},
&pb::EnumDescriptorProto_EnumReservedRange::start);
for (const auto& value : reserved_range) {
fmt::print(os, "{:{}}reserved {}", "", indent + 2, value.start());
if (value.has_end() && value.end() != value.start()) {
fmt::print(os, " to {}", value.end());
}
fmt::print(os, ";\n");
}
for (const auto& value : enum_proto.reserved_name()) {
for (const auto& value : maybe_sorted(enum_proto.reserved_name())) {
fmt::print(os, "{:{}}reserved \"{}\";\n", "", indent + 2, value);
}
if (enum_proto.options().has_allow_alias()) {
Expand All @@ -948,6 +999,12 @@ struct protobuf_schema_definition::impl {
fmt::print(os, "{:{}}option {};\n", "", indent + 2, option);
}
std::optional<std::decay_t<decltype(enum_proto.value())>> values;
if (is_normalized) {
values = enum_proto.value();
std::ranges::sort(values.value(), std::less{}, [](const auto& v) {
return std::pair<int, std::string_view>{v.number(), v.name()};
});
}
for (const auto& value : values.value_or(enum_proto.value())) {
fmt::print(
os, "{:{}}{} = {}", "", indent + 2, value.name(), value.number());
Expand Down Expand Up @@ -998,8 +1055,13 @@ struct protobuf_schema_definition::impl {
indent + 2,
service.options().deprecated());
}
for (const auto& option :
service.options().uninterpreted_option()) {
auto uninterpreted_options = maybe_sorted(
service.options().uninterpreted_option(),
std::less{},
[](const pb::UninterpretedOption& o) {
return fmt::format("{}", fmt::join(o.name(), ""));
});
for (const auto& option : uninterpreted_options) {
fmt::print(os, "{:{}}option {};\n", "", indent + 2, option);
}
}
Expand Down Expand Up @@ -1032,8 +1094,13 @@ struct protobuf_schema_definition::impl {
pb::MethodOptions_IdempotencyLevel_Name(
method.options().idempotency_level()));
}
for (const auto& option :
method.options().uninterpreted_option()) {
auto uninterpreted_options = maybe_sorted(
method.options().uninterpreted_option(),
std::less{},
[](const pb::UninterpretedOption& o) {
return fmt::format("{}", fmt::join(o.name(), ""));
});
for (const auto& option : uninterpreted_options) {
fmt::print(os, "{:{}}option {};\n", "", indent + 2, option);
}
}
Expand Down Expand Up @@ -1109,7 +1176,13 @@ struct protobuf_schema_definition::impl {
if (options.has_py_generic_services()) {
printv("py_generic_services", options.py_generic_services());
}
for (const auto& option : options.uninterpreted_option()) {
auto uninterpreted_options = maybe_sorted(
options.uninterpreted_option(),
std::less{},
[](const pb::UninterpretedOption& o) {
return fmt::format("{}", fmt::join(o.name(), ""));
});
for (const auto& option : uninterpreted_options) {
first_option = false;
fmt::print(os, "option {};\n", option);
}
Expand All @@ -1136,17 +1209,33 @@ struct protobuf_schema_definition::impl {
};

// return a range that matches the predicate
constexpr auto partition = [](auto begin, auto end, auto pred) {
return std::ranges::subrange(
begin, std::stable_partition(begin, end, pred));
};
constexpr auto partition =
[](auto begin, auto end, auto pred, normalize norm) {
return std::ranges::subrange(
begin,
norm ? std::partition(begin, end, pred)
: std::stable_partition(begin, end, pred));
};

auto public_deps = partition(
all_deps.begin(), all_deps.end(), is_public);
auto weak_deps = partition(public_deps.end(), all_deps.end(), is_weak);
all_deps.begin(), all_deps.end(), is_public, is_normalized);
auto weak_deps = partition(
public_deps.end(), all_deps.end(), is_weak, is_normalized);
auto private_deps = std::ranges::subrange(
weak_deps.end(), all_deps.end());

if (is_normalized) {
constexpr auto sort_and_unique = [](auto& deps) {
std::ranges::sort(deps);
deps = std::ranges::subrange(
deps.begin(), std::ranges::unique(deps).begin());
};

sort_and_unique(public_deps);
sort_and_unique(weak_deps);
sort_and_unique(private_deps);
}

auto print_deps = [&](const auto& view, std::string_view type) {
for (const auto& dep : view) {
fmt::print(os, "import {}\"{}\";\n", type, dep);
Expand Down Expand Up @@ -1264,27 +1353,33 @@ operator<<(std::ostream& os, const protobuf_schema_definition& def) {
return os;
}

ss::future<protobuf_schema_definition>
make_protobuf_schema_definition(schema_getter& store, canonical_schema schema) {
ss::future<protobuf_schema_definition> make_protobuf_schema_definition(
schema_getter& store, canonical_schema schema, normalize norm) {
auto refs = schema.def().refs();
auto impl = ss::make_shared<protobuf_schema_definition::impl>();
impl->fdp = co_await import_schema(impl->_dp, store, std::move(schema));
impl->fd = impl->_dp.FindFileByName(impl->fdp.name());
if (auto* s = dynamic_cast<const sharded_store*>(&store); s != nullptr) {
impl->v2_renderer = s->protobuf_v2_renderer();
}
impl->is_normalized = norm;
if (norm) {
std::sort(refs.begin(), refs.end());
auto uniq = std::ranges::unique(refs);
refs.erase(uniq.begin(), uniq.end());
}
co_return protobuf_schema_definition{std::move(impl), std::move(refs)};
}

ss::future<canonical_schema_definition>
validate_protobuf_schema(sharded_store& store, canonical_schema schema) {
ss::future<canonical_schema_definition> validate_protobuf_schema(
sharded_store& store, canonical_schema schema, normalize norm) {
auto res = co_await make_protobuf_schema_definition(
store, std::move(schema));
store, std::move(schema), norm);
co_return canonical_schema_definition{std::move(res)};
}

ss::future<canonical_schema>
make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) {
ss::future<canonical_schema> make_canonical_protobuf_schema(
sharded_store& store, unparsed_schema schema, normalize norm) {
auto [sub, unparsed] = std::move(schema).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();
canonical_schema temp{
Expand All @@ -1295,7 +1390,7 @@ make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) {

co_return canonical_schema{
std::move(sub),
co_await validate_protobuf_schema(store, std::move(temp))};
co_await validate_protobuf_schema(store, std::move(temp), norm)};
}

namespace {
Expand Down
20 changes: 12 additions & 8 deletions src/v/pandaproxy/schema_registry/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ class Descriptor;

namespace pandaproxy::schema_registry {

ss::future<protobuf_schema_definition>
make_protobuf_schema_definition(schema_getter& store, canonical_schema schema);

ss::future<canonical_schema_definition>
validate_protobuf_schema(sharded_store& store, canonical_schema schema);

ss::future<canonical_schema>
make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema);
ss::future<protobuf_schema_definition> make_protobuf_schema_definition(
schema_getter& store,
canonical_schema schema,
normalize norm = normalize::no);

ss::future<canonical_schema_definition> validate_protobuf_schema(
sharded_store& store,
canonical_schema schema,
normalize norm = normalize::no);

ss::future<canonical_schema> make_canonical_protobuf_schema(
sharded_store& store, unparsed_schema schema, normalize norm = normalize::no);

compatibility_result check_compatible(
const protobuf_schema_definition& reader,
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ sharded_store::make_canonical_schema(unparsed_schema schema, normalize norm) {
}
case schema_type::protobuf:
co_return co_await make_canonical_protobuf_schema(
*this, std::move(schema));
*this, std::move(schema), norm);
case schema_type::json:
co_return co_await make_canonical_json_schema(
*this, std::move(schema), norm);
Expand Down
Loading

0 comments on commit 603b3d7

Please sign in to comment.