Skip to content

Commit

Permalink
Merge pull request redpanda-data#23151 from mmaslankaprv/iceberg-proto
Browse files Browse the repository at this point in the history
Added protocol buffers schema translation
  • Loading branch information
mmaslankaprv authored Sep 25, 2024
2 parents 96a365f + f68d6fa commit 9481ce7
Show file tree
Hide file tree
Showing 10 changed files with 761 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("//bazel:build.bzl", "redpanda_cc_library")

redpanda_cc_library(
name = "conversion_outcome",
srcs = [
],
hdrs = [
"conversion_outcome.h",
],
include_prefix = "datalake",
deps = [
"@seastar",
],
)

redpanda_cc_library(
name = "schema_protobuf",
srcs = [
"schema_protobuf.cc",
],
hdrs = [
"schema_protobuf.h",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
":conversion_outcome",
"//src/v/iceberg:datatypes",
"@fmt",
"@protobuf",
"@seastar",
],
)
1 change: 1 addition & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ v_cc_library(
proto_to_arrow_struct.cc
record_multiplexer.cc
schemaless_translator.cc
schema_protobuf.cc
DEPS
v::storage
Seastar::seastar
Expand Down
32 changes: 32 additions & 0 deletions src/v/datalake/conversion_outcome.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "base/outcome.h"

namespace datalake {
class schema_conversion_exception final : public std::exception {
public:
explicit schema_conversion_exception(std::string msg) noexcept
: msg_(std::move(msg)) {}

const char* what() const noexcept final { return msg_.c_str(); }

private:
std::string msg_;
};
/**
* Class representing an outcome of schema conversion. If schema validation
* failed the outcome will contain an error. The type is simillar to the Either
* type idea, it either contains a value or result.
*/
template<typename SchemaT>
using conversion_outcome = result<SchemaT, schema_conversion_exception>;

}; // namespace datalake
185 changes: 185 additions & 0 deletions src/v/datalake/schema_protobuf.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "datalake/schema_protobuf.h"

#include "datalake/conversion_outcome.h"
#include "iceberg/datatypes.h"

#include <seastar/core/sstring.hh>

#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>

#include <deque>

namespace datalake {

namespace {

namespace pb = google::protobuf;

/**
* Schema conversion from protocol buffers to internal Redpanda iceberg schema
*/
using field_outcome = conversion_outcome<iceberg::nested_field_ptr>;
using struct_outcome = conversion_outcome<iceberg::struct_type>;
using type_stack = std::deque<const pb::Descriptor*>;
static constexpr int max_recursion_depth = 100;

field_outcome
from_protobuf(const pb::FieldDescriptor& fd, bool is_repeated, type_stack&);

field_outcome success(const pb::FieldDescriptor& fd, iceberg::field_type ft) {
return iceberg::nested_field::create(
fd.number(), fd.name(), iceberg::field_required::no, std::move(ft));
}

bool is_recursive_type(const pb::Descriptor& msg, type_stack& stack) {
return std::any_of(
stack.begin(), stack.end(), [&](const pb::Descriptor* d) {
return d->full_name() == msg.full_name();
});
}

struct_outcome
struct_from_protobuf(const pb::Descriptor& msg, type_stack& stack) {
if (is_recursive_type(msg, stack)) {
return schema_conversion_exception(fmt::format(
"Protocol buffer field {} not supported", msg.DebugString()));
}
if (stack.size() > max_recursion_depth) {
return schema_conversion_exception(fmt::format(
"Protocol buffer field {} not supported", msg.DebugString()));
}
stack.push_back(&msg);
iceberg::struct_type struct_t;
struct_t.fields.reserve(msg.field_count());
for (int i = 0; i < msg.field_count(); ++i) {
auto field = msg.field(i);
auto res = from_protobuf(*field, field->is_repeated(), stack);
if (res.has_error()) {
return res.error();
}
struct_t.fields.push_back(std::move(res).value());
}
return std::move(struct_t);
}

field_outcome from_protobuf(
const pb::FieldDescriptor& fd, bool is_repeated, type_stack& stack) {
if (fd.is_map()) {
auto mt = fd.message_type();
auto key_field = mt->map_key();
auto value_field = mt->map_value();
auto key_nested_res = from_protobuf(*key_field, false, stack);
if (key_nested_res.has_error()) {
return key_nested_res;
}
auto value_field_nested_res = from_protobuf(*value_field, false, stack);
if (value_field_nested_res.has_error()) {
return value_field_nested_res;
}
auto field_type = iceberg::map_type::create(
key_field->number(),
std::move(std::move(key_nested_res).assume_value()->type),
value_field->number(),
iceberg::field_required(!value_field->is_optional()),
std::move(std::move(value_field_nested_res).assume_value()->type));
return success(fd, std::move(field_type));
}
if (is_repeated) {
auto field_type_res = from_protobuf(fd, false, stack);
if (field_type_res.has_error()) {
return field_type_res;
}

auto type = iceberg::list_type::create(
fd.number(),
iceberg::field_required::no,
std::move(std::move(field_type_res).assume_value()->type));
return success(fd, std::move(type));
}

switch (fd.type()) {
case pb::FieldDescriptor::Type::TYPE_BOOL:
return success(fd, iceberg::boolean_type{});
case pb::FieldDescriptor::TYPE_DOUBLE:
return success(fd, iceberg::double_type{});
case pb::FieldDescriptor::TYPE_FLOAT:
return success(fd, iceberg::float_type{});
case pb::FieldDescriptor::TYPE_INT64:
/**
* We support 32 bits unsigned integers by casting them to the iceberg long
* type, the long type can hold the max value of uint32_t without an
* overflow. This isn't true for 64 bit unsigned integers and this is why we
* return a validation error in this case.
* This the original behavior of Parquet as in:
* https://github.com/apache/iceberg/blob/79fd977f67592a16579cff31478e7ea98ef126e4/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java#L223
*
*/
case pb::FieldDescriptor::TYPE_UINT32:
case pb::FieldDescriptor::TYPE_FIXED32:
return success(fd, iceberg::long_type{});
case pb::FieldDescriptor::TYPE_UINT64:
case pb::FieldDescriptor::TYPE_FIXED64:
return schema_conversion_exception(fmt::format(
"Protocol buffer field {} type {} not supported, iceberg only "
"supports signed integers",
fd.DebugString(),
fd.type_name()));
case pb::FieldDescriptor::TYPE_INT32:
return success(fd, iceberg::int_type{});
case pb::FieldDescriptor::TYPE_STRING:
return success(fd, iceberg::string_type{});
case pb::FieldDescriptor::TYPE_GROUP:
return schema_conversion_exception(fmt::format(
"Protocol buffer field {} type {} not supported",
fd.DebugString(),
fd.type_name()));
case pb::FieldDescriptor::TYPE_MESSAGE: {
auto msg_t = fd.message_type();

// special case for handling google.protobuf.Timestamp
if (
msg_t->well_known_type() == pb::Descriptor::WELLKNOWNTYPE_TIMESTAMP) {
return success(fd, iceberg::timestamp_type{});
}

auto st_result = struct_from_protobuf(*msg_t, stack);
stack.pop_back();
if (st_result.has_error()) {
return st_result.error();
}

return success(fd, std::move(st_result).assume_value());
}
case pb::FieldDescriptor::TYPE_BYTES:
return success(fd, iceberg::binary_type{});
case pb::FieldDescriptor::TYPE_ENUM:
[[fallthrough]];
case pb::FieldDescriptor::TYPE_SFIXED32:
return success(fd, iceberg::int_type{});
case pb::FieldDescriptor::TYPE_SFIXED64:
return success(fd, iceberg::long_type{});
case pb::FieldDescriptor::TYPE_SINT32:
return success(fd, iceberg::int_type{});
case pb::FieldDescriptor::TYPE_SINT64:
return success(fd, iceberg::long_type{});
}
}
} // namespace

conversion_outcome<iceberg::struct_type>
type_to_iceberg(const pb::Descriptor& descriptor) {
type_stack stack;
return struct_from_protobuf(descriptor, stack);
}

} // namespace datalake
21 changes: 21 additions & 0 deletions src/v/datalake/schema_protobuf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "datalake/conversion_outcome.h"
#include "google/protobuf/descriptor.h"
#include "iceberg/datatypes.h"

namespace datalake {

// convert a protobuf message schema to iceberg schema
conversion_outcome<iceberg::struct_type>
type_to_iceberg(const google::protobuf::Descriptor& pool);

} // namespace datalake
25 changes: 25 additions & 0 deletions src/v/datalake/tests/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("//bazel:test.bzl", "redpanda_cc_gtest")

redpanda_cc_gtest(
name = "schema_protobuf_test",
timeout = "short",
srcs = [
"schema_protobuf_test.cc",
],
data = [
"testdata/iceberg_ready_test_messages_edition2023.proto",
],
deps = [
"//src/v/datalake:schema_protobuf",
"//src/v/iceberg:datatypes",
"//src/v/test_utils:gtest",
"//src/v/test_utils:runfiles",
"//src/v/utils:file_io",
"@fmt",
"@googletest//:gtest",
"@protobuf",
"@protobuf//src/google/protobuf/compiler:importer",
"@protobuf//src/google/protobuf/io:tokenizer",
"@seastar",
],
)
17 changes: 17 additions & 0 deletions src/v/datalake/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set(testdata_dir "${CMAKE_CURRENT_SOURCE_DIR}/testdata")

rp_test(
UNIT_TEST
GTEST
Expand Down Expand Up @@ -40,4 +42,19 @@ rp_test(
v::iceberg_test_utils
LABELS storage
ARGS "-- -c 1"
)

rp_test(
UNIT_TEST
GTEST
USE_CWD
BINARY_NAME schema_protobuf
SOURCES schema_protobuf_test.cc
LIBRARIES
v::gtest_main
v::datalake
v::iceberg_test_utils
INPUT_FILES
"${testdata_dir}/iceberg_ready_test_messages_edition2023.proto"
ARGS "-- -c 1"
)
Loading

0 comments on commit 9481ce7

Please sign in to comment.