Skip to content

Commit

Permalink
schema_registry: Support the compatible format for CONFIG value
Browse files Browse the repository at this point in the history
An example of a config record to be supported.

```json
{
  "topic": "_schemas",
  "key": "{\"keytype\":\"CONFIG\",\"subject\":\"test-ben1234\",\"magic\":0}",
  "value": "{\"subject\":\"test-ben1234\",\"compatibilityLevel\":\"FULL_TRANSITIVE\"}",
  "timestamp": 1699574968787,
  "partition": 0,
  "offset": 2
}

```
The subject field in the value is not checked against the key.
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Nov 10, 2023
1 parent 80f85e0 commit ae94457
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
25 changes: 20 additions & 5 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,16 @@ class config_key_handler : public json::base_handler<Encoding> {

struct config_value {
compatibility_level compat{compatibility_level::none};
std::optional<subject> sub;

friend bool operator==(const config_value&, const config_value&) = default;

friend std::ostream& operator<<(std::ostream& os, const config_value& v) {
if (v.sub.has_value()) {
fmt::print(os, "subject: {}, ", v.sub.value());
}
fmt::print(os, "compatibility: {}", to_string_view(v.compat));

return os;
}
};
Expand All @@ -779,6 +784,10 @@ inline void rjson_serialize(
::json::Writer<::json::StringBuffer>& w,
const schema_registry::config_value& val) {
w.StartObject();
if (val.sub.has_value()) {
w.Key("subject");
::json::rjson_serialize(w, val.sub.value());
}
w.Key("compatibilityLevel");
::json::rjson_serialize(w, to_string_view(val.compat));
w.EndObject();
Expand All @@ -790,6 +799,7 @@ class config_value_handler : public json::base_handler<Encoding> {
empty = 0,
object,
compatibility,
subject,
};
state _state = state::empty;

Expand All @@ -803,11 +813,12 @@ class config_value_handler : public json::base_handler<Encoding> {

bool Key(const Ch* str, ::json::SizeType len, bool) {
auto sv = std::string_view{str, len};
if (_state == state::object && sv == "compatibilityLevel") {
_state = state::compatibility;
return true;
}
return false;
std::optional<state> s{
string_switch<std::optional<state>>(sv)
.match("compatibilityLevel", state::compatibility)
.match("subject", state::subject)
.default_match(std::nullopt)};
return s.has_value() && std::exchange(_state, *s) == state::object;
}

bool String(const Ch* str, ::json::SizeType len, bool) {
Expand All @@ -819,6 +830,10 @@ class config_value_handler : public json::base_handler<Encoding> {
_state = state::object;
}
return s.has_value();
} else if (_state == state::subject) {
result.sub.emplace(sv);
_state = state::object;
return true;
}
return false;
}
Expand Down
19 changes: 19 additions & 0 deletions src/v/pandaproxy/schema_registry/test/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ constexpr std::string_view config_value_sv{
const pps::config_value config_value{
.compat = pps::compatibility_level::forward_transitive};

constexpr std::string_view config_value_sub_sv{
R"({
"subject": "my-kafka-value",
"compatibilityLevel": "FORWARD_TRANSITIVE"
})"};
const pps::config_value config_value_sub{

.compat = pps::compatibility_level::forward_transitive,
.sub{pps::subject{"my-kafka-value"}}};

constexpr std::string_view delete_subject_key_sv{
R"({
"keytype": "DELETE_SUBJECT",
Expand Down Expand Up @@ -161,6 +171,15 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) {
BOOST_CHECK_EQUAL(str, ::json::minify(config_value_sv));
}

{
auto val = ppj::rjson_parse(
config_value_sub_sv.data(), pps::config_value_handler<>{});
BOOST_CHECK_EQUAL(config_value_sub, val);

auto str = ppj::rjson_serialize(config_value_sub);
BOOST_CHECK_EQUAL(str, ::json::minify(config_value_sub_sv));
}

{
auto val = ppj::rjson_parse(
delete_subject_key_sv.data(), pps::delete_subject_key_handler<>{});
Expand Down

0 comments on commit ae94457

Please sign in to comment.