Skip to content

Commit

Permalink
schema_registry: Introduce config for protobuf_renderer_v2
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jan 9, 2025
1 parent 589701f commit 85998a3
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3488,6 +3488,12 @@ configuration::configuration()
"Normalize schemas as they are read from the topic on startup.",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
false)
, schema_registry_protobuf_renderer_v2(
*this,
"schema_registry_protobuf_renderer_v2",
"Enables experimental protobuf renderer to support normalize=true.",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
false)
, pp_sr_smp_max_non_local_requests(
*this,
"pp_sr_smp_max_non_local_requests",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ struct configuration final : public config_store {
config::property<size_t> kafka_schema_id_validation_cache_capacity;

property<bool> schema_registry_normalize_on_startup;
property<bool> schema_registry_protobuf_renderer_v2;
property<std::optional<uint32_t>> pp_sr_smp_max_non_local_requests;
bounded_property<size_t> max_in_flight_schema_registry_requests_per_shard;
bounded_property<size_t> max_in_flight_pandaproxy_requests_per_shard;
Expand Down
4 changes: 3 additions & 1 deletion src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "pandaproxy/schema_registry/schema_id_cache.h"
#include "pandaproxy/schema_registry/service.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
#include "pandaproxy/schema_registry/validation_metrics.h"

#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -44,7 +45,8 @@ api::api(
api::~api() noexcept = default;

ss::future<> api::start() {
_store = std::make_unique<sharded_store>();
_store = std::make_unique<sharded_store>(protobuf_renderer_v2{
config::shard_local_cfg().schema_registry_protobuf_renderer_v2});
co_await _store->start(is_mutable(_cfg.mode_mutability), _sg);
co_await _schema_id_validation_probe.start();
co_await _schema_id_validation_probe.invoke_on_all(
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ ss::future<const pb::FileDescriptor*> import_schema(
struct protobuf_schema_definition::impl {
pb::DescriptorPool _dp;
const pb::FileDescriptor* fd{};
protobuf_renderer_v2 v2_renderer{protobuf_renderer_v2::no};

/**
* debug_string swaps the order of the import and package lines that
Expand Down Expand Up @@ -434,6 +435,9 @@ make_protobuf_schema_definition(schema_getter& store, canonical_schema schema) {
auto impl = ss::make_shared<protobuf_schema_definition::impl>();
auto refs = schema.def().refs();
impl->fd = co_await import_schema(impl->_dp, store, std::move(schema));
if (auto* s = dynamic_cast<const sharded_store*>(&store); s != nullptr) {
impl->v2_renderer = s->protobuf_v2_renderer();
}
co_return protobuf_schema_definition{std::move(impl), std::move(refs)};
}

Expand Down
8 changes: 8 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class store;
/// subject or schema_id
class sharded_store final : public schema_getter {
public:
explicit sharded_store(
protobuf_renderer_v2 v2_renderer = protobuf_renderer_v2::no)
: _v2_renderer(v2_renderer) {}
~sharded_store() override = default;
ss::future<> start(is_mutable mut, ss::smp_service_group sg);
ss::future<> stop();
Expand Down Expand Up @@ -198,6 +201,10 @@ class sharded_store final : public schema_getter {
//// \brief Throw if the store is not mutable
void check_mode_mutability(force f) const;

//// \brief Whether to use the experimental v2 protobuf renderer to support
//// normalize=true;
protobuf_renderer_v2 protobuf_v2_renderer() const { return _v2_renderer; }

private:
ss::future<compatibility_result> do_is_compatible(
schema_version version, canonical_schema new_schema, verbose is_verbose);
Expand Down Expand Up @@ -228,6 +235,7 @@ class sharded_store final : public schema_getter {

///\brief Access must occur only on shard 0.
schema_id _next_schema_id{1};
protobuf_renderer_v2 _v2_renderer;
};

} // namespace pandaproxy::schema_registry
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ namespace pps = pp::schema_registry;
namespace {

struct simple_sharded_store {
simple_sharded_store() {
explicit simple_sharded_store(
pps::protobuf_renderer_v2 proto_v2 = pps::protobuf_renderer_v2::no)
: store{proto_v2} {
store.start(pps::is_mutable::yes, ss::default_smp_service_group())
.get();
}
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ using default_to_global = ss::bool_class<struct default_to_global_tag>;
using force = ss::bool_class<struct force_tag>;
using normalize = ss::bool_class<struct normalize_tag>;
using verbose = ss::bool_class<struct verbose_tag>;
using protobuf_renderer_v2 = ss::bool_class<struct protobuf_renderer_v2_tag>;

template<typename E>
std::enable_if_t<std::is_enum_v<E>, std::optional<E>>
Expand Down

0 comments on commit 85998a3

Please sign in to comment.