diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 9078da396c32..d249bdeb03b1 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3488,6 +3488,12 @@ configuration::configuration() "Normalize schemas as they are read from the topic on startup.", {.needs_restart = needs_restart::yes, .visibility = visibility::user}, false) + , schema_registry_protobuf_renderer_v2( + *this, + "schema_registry_protobuf_renderer_v2", + "Enables experimental protobuf renderer to support normalize=true.", + {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, + false) , pp_sr_smp_max_non_local_requests( *this, "pp_sr_smp_max_non_local_requests", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 32c541159505..490bd0d0fed8 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -663,6 +663,7 @@ struct configuration final : public config_store { config::property kafka_schema_id_validation_cache_capacity; property schema_registry_normalize_on_startup; + property schema_registry_protobuf_renderer_v2; property> pp_sr_smp_max_non_local_requests; bounded_property max_in_flight_schema_registry_requests_per_shard; bounded_property max_in_flight_pandaproxy_requests_per_shard; diff --git a/src/v/pandaproxy/schema_registry/api.cc b/src/v/pandaproxy/schema_registry/api.cc index 092996683d91..fb45bd9628e1 100644 --- a/src/v/pandaproxy/schema_registry/api.cc +++ b/src/v/pandaproxy/schema_registry/api.cc @@ -17,6 +17,7 @@ #include "pandaproxy/schema_registry/schema_id_cache.h" #include "pandaproxy/schema_registry/service.h" #include "pandaproxy/schema_registry/sharded_store.h" +#include "pandaproxy/schema_registry/types.h" #include "pandaproxy/schema_registry/validation_metrics.h" #include @@ -44,7 +45,8 @@ api::api( api::~api() noexcept = default; ss::future<> api::start() { - _store = std::make_unique(); + _store = std::make_unique(protobuf_renderer_v2{ + config::shard_local_cfg().schema_registry_protobuf_renderer_v2}); co_await _store->start(is_mutable(_cfg.mode_mutability), _sg); co_await _schema_id_validation_probe.start(); co_await _schema_id_validation_probe.invoke_on_all( diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index a0ff8f5244bd..a80e789016a7 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -324,6 +324,7 @@ ss::future import_schema( struct protobuf_schema_definition::impl { pb::DescriptorPool _dp; const pb::FileDescriptor* fd{}; + protobuf_renderer_v2 v2_renderer{protobuf_renderer_v2::no}; /** * debug_string swaps the order of the import and package lines that @@ -434,6 +435,9 @@ make_protobuf_schema_definition(schema_getter& store, canonical_schema schema) { auto impl = ss::make_shared(); auto refs = schema.def().refs(); impl->fd = co_await import_schema(impl->_dp, store, std::move(schema)); + if (auto* s = dynamic_cast(&store); s != nullptr) { + impl->v2_renderer = s->protobuf_v2_renderer(); + } co_return protobuf_schema_definition{std::move(impl), std::move(refs)}; } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 5b479d6f5a09..88cbb3880d73 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -25,6 +25,9 @@ class store; /// subject or schema_id class sharded_store final : public schema_getter { public: + explicit sharded_store( + protobuf_renderer_v2 v2_renderer = protobuf_renderer_v2::no) + : _v2_renderer(v2_renderer) {} ~sharded_store() override = default; ss::future<> start(is_mutable mut, ss::smp_service_group sg); ss::future<> stop(); @@ -198,6 +201,10 @@ class sharded_store final : public schema_getter { //// \brief Throw if the store is not mutable void check_mode_mutability(force f) const; + //// \brief Whether to use the experimental v2 protobuf renderer to support + //// normalize=true; + protobuf_renderer_v2 protobuf_v2_renderer() const { return _v2_renderer; } + private: ss::future do_is_compatible( schema_version version, canonical_schema new_schema, verbose is_verbose); @@ -228,6 +235,7 @@ class sharded_store final : public schema_getter { ///\brief Access must occur only on shard 0. schema_id _next_schema_id{1}; + protobuf_renderer_v2 _v2_renderer; }; } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc index 338cf1be0115..2e2d14caea54 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc @@ -31,7 +31,9 @@ namespace pps = pp::schema_registry; namespace { struct simple_sharded_store { - simple_sharded_store() { + explicit simple_sharded_store( + pps::protobuf_renderer_v2 proto_v2 = pps::protobuf_renderer_v2::no) + : store{proto_v2} { store.start(pps::is_mutable::yes, ss::default_smp_service_group()) .get(); } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 5eabd02e3ac4..e9a928baecd6 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -37,6 +37,7 @@ using default_to_global = ss::bool_class; using force = ss::bool_class; using normalize = ss::bool_class; using verbose = ss::bool_class; +using protobuf_renderer_v2 = ss::bool_class; template std::enable_if_t, std::optional>