diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index 94666d12e104..3c96ec611118 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -31,3 +31,71 @@ redpanda_cc_library( "@seastar", ], ) + +redpanda_cc_library( + name = "logger", + hdrs = [ + "logger.h", + ], + include_prefix = "datalake", + visibility = [":__subpackages__"], + deps = [ + "//src/v/base", + "@seastar", + ], +) + +redpanda_cc_library( + name = "types", + hdrs = [ + "errors.h", + "fwd.h", + ], + include_prefix = "datalake", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + "//src/v/serde", + ], +) + +redpanda_cc_library( + name = "writer", + hdrs = [ + "data_writer_interface.h", + "schemaless_translator.h", + ], + include_prefix = "datalake", + visibility = [":__subpackages__"], + deps = [ + "//src/v/base", + "//src/v/iceberg:datatypes", + "//src/v/iceberg:values", + "//src/v/serde", + ], +) + +redpanda_cc_library( + name = "manager", + srcs = [ + "datalake_manager.cc", + ], + hdrs = [ + "datalake_manager.h", + ], + include_prefix = "datalake", + visibility = ["//visibility:public"], + deps = [ + ":logger", + ":types", + "//src/v/base", + "//src/v/cluster", + "//src/v/datalake/coordinator:frontend", + "//src/v/model", + "//src/v/raft", + "//src/v/serde", + "//src/v/ssx:semaphore", + "@fmt", + "@seastar", + ], +) diff --git a/src/v/datalake/CMakeLists.txt b/src/v/datalake/CMakeLists.txt index 809b0fd436f3..df9361acdabf 100644 --- a/src/v/datalake/CMakeLists.txt +++ b/src/v/datalake/CMakeLists.txt @@ -2,16 +2,19 @@ find_package(Arrow REQUIRED) find_package(Parquet REQUIRED) find_package(Protobuf REQUIRED) +add_subdirectory(coordinator) v_cc_library( NAME datalake SRCS arrow_translator.cc + datalake_manager.cc parquet_writer.cc record_multiplexer.cc schemaless_translator.cc schema_protobuf.cc DEPS + v::datalake_coordinator v::storage Seastar::seastar Arrow::arrow_shared diff --git a/src/v/datalake/coordinator/BUILD b/src/v/datalake/coordinator/BUILD new file mode 100644 index 000000000000..234ad9552be9 --- /dev/null +++ b/src/v/datalake/coordinator/BUILD @@ -0,0 +1,73 @@ +load("//bazel:build.bzl", "redpanda_cc_library") +load("//src/v/rpc:compiler.bzl", "redpanda_cc_rpc_library") + +redpanda_cc_rpc_library( + name = "generated_datalake_coordinator_rpc", + src = "rpc.json", + include_prefix = "datalake/coordinator", +) + +redpanda_cc_library( + name = "model", + hdrs = [ + "types.h", + ], + include_prefix = "datalake/coordinator", + visibility = [":__subpackages__"], + deps = [ + "//src/v/serde", + "//src/v/serde:enum", + "//src/v/datalake:types", + # todo: split writer further once it evolves + "//src/v/datalake:writer", + ], +) + +redpanda_cc_library( + name = "stm", + srcs = [ + "state_machine.cc", + ], + hdrs = [ + "state_machine.h", + ], + include_prefix = "datalake/coordinator", + visibility = ["//visibility:public"], + deps = [ + ":model", + "//src/v/cluster", + "//src/v/datalake:logger", + "//src/v/raft", + "//src/v/serde", + "//src/v/serde:enum", + ], +) + +redpanda_cc_library( + name = "frontend", + srcs = [ + "frontend.cc", + "service.cc", + ], + hdrs = [ + "frontend.h", + "service.h", + ], + include_prefix = "datalake/coordinator", + visibility = ["//visibility:public"], + deps = [ + ":generated_datalake_coordinator_rpc", + ":model", + ":stm", + "//src/v/base", + "//src/v/cluster", + "//src/v/container:fragmented_vector", + "//src/v/datalake:logger", + "//src/v/datalake:types", + "//src/v/model", + "//src/v/raft", + "//src/v/rpc", + "@fmt", + "@seastar", + ], +) diff --git a/src/v/datalake/coordinator/CMakeLists.txt b/src/v/datalake/coordinator/CMakeLists.txt new file mode 100644 index 000000000000..1dfff9131270 --- /dev/null +++ b/src/v/datalake/coordinator/CMakeLists.txt @@ -0,0 +1,21 @@ +include(rpcgen) + +rpcgen(TARGET generated_datalake_coordinator_rpc + IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/rpc.json + OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/rpc_service.h + INCLUDES ${CMAKE_BINARY_DIR}/src/v +) + +v_cc_library( + NAME datalake_coordinator + SRCS + frontend.cc + service.cc + state_machine.cc + DEPS + generated_datalake_coordinator_rpc + v::cluster + v::model + v::rpc + Seastar::seastar +) diff --git a/src/v/datalake/coordinator/frontend.cc b/src/v/datalake/coordinator/frontend.cc new file mode 100644 index 000000000000..62b38edda4f3 --- /dev/null +++ b/src/v/datalake/coordinator/frontend.cc @@ -0,0 +1,259 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/coordinator/frontend.h" + +#include "cluster/metadata_cache.h" +#include "cluster/partition_leaders_table.h" +#include "cluster/partition_manager.h" +#include "cluster/shard_table.h" +#include "cluster/topics_frontend.h" +#include "datalake/coordinator/state_machine.h" +#include "datalake/logger.h" +#include "raft/group_manager.h" +#include "rpc/connection_cache.h" + +namespace datalake::coordinator { + +template +requires requires(frontend::proto_t f, req_t req, ::rpc::client_opts opts) { + (f.*Func)(std::move(req), std::move(opts)); +} +auto frontend::remote_dispatch(req_t request, model::node_id leader_id) { + using resp_t = req_t::resp_t; + return _connection_cache->local() + .with_node_client( + _self, + ss::this_shard_id(), + leader_id, + rpc_timeout, + [request = std::move(request)](proto_t proto) mutable { + return (proto.*Func)( + std::move(request), + ::rpc::client_opts{model::timeout_clock::now() + rpc_timeout}); + }) + .then(&::rpc::get_ctx_data) + .then([leader_id](result r) { + if (r.has_error()) { + vlog( + datalake::datalake_log.warn, + "got error {} on coordinator {}", + r.error().message(), + leader_id); + return resp_t{datalake::coordinator_errc::timeout}; + } + return r.value(); + }); +} + +template +requires requires( + datalake::coordinator::frontend f, const model::ntp& ntp, req_t req) { + (f.*LocalFunc)(std::move(req), ntp, ss::shard_id{0}); +} +auto frontend::process(req_t req, bool local_only) { + using resp_t = req_t::resp_t; + return ensure_topic_exists().then([req = std::move(req), local_only, this]( + bool exists) mutable { + if (!exists) { + return ss::make_ready_future( + resp_t{datalake::coordinator_errc::coordinator_topic_not_exists}); + } + auto cp = coordinator_partition(req.topic_partition()); + model::ntp c_ntp{ + model::datalake_coordinator_nt.ns, + model::datalake_coordinator_nt.tp, + cp.value()}; + auto leader = _leaders->local().get_leader(c_ntp); + if (leader == _self) { + auto shard = _shard_table->local().shard_for(c_ntp); + if (shard) { + return (this->*LocalFunc)( + std::move(req), std::move(c_ntp), shard.value()); + } + } else if (leader && !local_only) { + return remote_dispatch(std::move(req), leader.value()); + } + return ss::make_ready_future( + resp_t{datalake::coordinator_errc::not_leader}); + }); +} + +// -- explicit instantiations --- +template auto + frontend::remote_dispatch<&frontend::client::add_translated_data_files>( + add_translated_data_files_request, model::node_id); + +template auto frontend::process< + &frontend::add_translated_data_files_locally, + &frontend::client::add_translated_data_files>( + add_translated_data_files_request, bool); + +template auto + frontend::remote_dispatch<&frontend::client::fetch_latest_data_file>( + fetch_latest_data_file_request, model::node_id); + +template auto frontend::process< + &frontend::fetch_latest_data_file_locally, + &frontend::client::fetch_latest_data_file>( + fetch_latest_data_file_request, bool); + +// -- explicit instantiations --- + +frontend::frontend( + model::node_id self, + ss::sharded* group_mgr, + ss::sharded* partition_mgr, + ss::sharded* topics_frontend, + ss::sharded* metadata, + ss::sharded* leaders, + ss::sharded* shards) + : _self(self) + , _group_mgr(group_mgr) + , _partition_mgr(partition_mgr) + , _topics_frontend(topics_frontend) + , _metadata(metadata) + , _leaders(leaders) + , _shard_table(shards) {} + +ss::future<> frontend::stop() { return _gate.close(); } + +std::optional +frontend::coordinator_partition(const model::topic_partition& tp) const { + const auto md = _metadata->local().get_topic_metadata_ref( + model::datalake_coordinator_nt); + if (!md) { + return std::nullopt; + } + iobuf temp; + write(temp, tp); + auto bytes = iobuf_to_bytes(temp); + auto partition = murmur2(bytes.data(), bytes.size()) + % md->get().get_configuration().partition_count; + return model::partition_id{static_cast(partition)}; +} + +ss::future frontend::ensure_topic_exists() { + // todo: make these configurable. + static constexpr int16_t default_replication_factor = 3; + static constexpr int32_t default_coordinator_partitions = 3; + + const auto& metadata = _metadata->local(); + if (metadata.get_topic_metadata_ref(model::datalake_coordinator_nt)) { + co_return true; + } + auto replication_factor = default_replication_factor; + if (replication_factor > static_cast(metadata.node_count())) { + replication_factor = 1; + } + + cluster::topic_configuration topic{ + model::datalake_coordinator_nt.ns, + model::datalake_coordinator_nt.tp, + default_coordinator_partitions, + replication_factor}; + + topic.properties.compression = model::compression::none; + // todo: fix this by implementing on demand raft + // snapshots. + topic.properties.cleanup_policy_bitflags + = model::cleanup_policy_bitflags::none; + topic.properties.retention_bytes = tristate(); + topic.properties.retention_local_target_bytes = tristate(); + topic.properties.retention_duration = tristate(); + topic.properties.retention_local_target_ms + = tristate(); + + try { + auto res = co_await _topics_frontend->local().autocreate_topics( + {std::move(topic)}, + config::shard_local_cfg().create_topic_timeout_ms()); + vassert( + res.size() == 1, + "Incorrect result when creating {}, expected 1 response, got: {}", + model::datalake_coordinator_nt, + res.size()); + if ( + res[0].ec != cluster::errc::success + && res[0].ec != cluster::errc::topic_already_exists) { + vlog( + datalake::datalake_log.warn, + "can not create topic: {} - error: {}", + model::datalake_coordinator_nt, + cluster::make_error_code(res[0].ec).message()); + co_return false; + } + co_return true; + } catch (const std::exception_ptr& e) { + vlog( + datalake::datalake_log.warn, + "can not create topic {} - exception: {}", + model::datalake_coordinator_nt, + e); + co_return false; + } +} + +ss::future +frontend::add_translated_data_files_locally( + add_translated_data_files_request request, + const model::ntp& coordinator_partition, + ss::shard_id shard) { + co_return co_await _partition_mgr->invoke_on( + shard, + [&coordinator_partition, + req = std::move(request)](cluster::partition_manager& mgr) mutable { + auto partition = mgr.get(coordinator_partition); + if (!partition) { + return ssx::now( + add_translated_data_files_reply{coordinator_errc::not_leader}); + } + auto stm = partition->raft()->stm_manager()->get(); + return stm->add_translated_data_file(std::move(req)); + }); +} + +ss::future frontend::add_translated_data_files( + add_translated_data_files_request request, local_only local_only_exec) { + auto holder = _gate.hold(); + co_return co_await process< + &frontend::add_translated_data_files_locally, + &client::add_translated_data_files>( + std::move(request), bool(local_only_exec)); +} + +ss::future +frontend::fetch_latest_data_file_locally( + fetch_latest_data_file_request request, + const model::ntp& coordinator_partition, + ss::shard_id shard) { + co_return co_await _partition_mgr->invoke_on( + shard, + [&coordinator_partition, + req = std::move(request)](cluster::partition_manager& mgr) mutable { + auto partition = mgr.get(coordinator_partition); + if (!partition) { + return ssx::now( + fetch_latest_data_file_reply{coordinator_errc::not_leader}); + } + auto stm = partition->raft()->stm_manager()->get(); + return stm->fetch_latest_data_file(std::move(req)); + }); +} + +ss::future frontend::fetch_latest_data_file( + fetch_latest_data_file_request request, local_only local_only_exec) { + auto holder = _gate.hold(); + co_return co_await process< + &frontend::fetch_latest_data_file_locally, + &client::fetch_latest_data_file>( + std::move(request), bool(local_only_exec)); +} + +} // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/frontend.h b/src/v/datalake/coordinator/frontend.h new file mode 100644 index 000000000000..ac808129950c --- /dev/null +++ b/src/v/datalake/coordinator/frontend.h @@ -0,0 +1,101 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "base/outcome.h" +#include "base/seastarx.h" +#include "cluster/fwd.h" +#include "datalake/coordinator/rpc_service.h" +#include "datalake/coordinator/types.h" +#include "model/namespace.h" +#include "raft/fwd.h" +#include "rpc/fwd.h" + +#include +#include +#include +#include + +namespace datalake::coordinator { + +/* + * Frontend is the gateway into the coordinator state machines on a given shard. + * One frontend instance per shard. + */ +class frontend : public ss::peering_sharded_service { +public: + using local_only = ss::bool_class; + + frontend( + model::node_id self, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*); + + ss::future<> stop(); + + ss::future add_translated_data_files( + add_translated_data_files_request, local_only = local_only::no); + + ss::future fetch_latest_data_file( + fetch_latest_data_file_request, local_only = local_only::no); + +private: + using proto_t = datalake::coordinator::rpc::impl:: + datalake_coordinator_rpc_client_protocol; + using client = datalake::coordinator::rpc::impl:: + datalake_coordinator_rpc_client_protocol; + + static constexpr std::chrono::seconds rpc_timeout{5}; + + // utilities for boiler plate RPC code. + + template + requires requires(proto_t f, req_t req, ::rpc::client_opts opts) { + (f.*Func)(std::move(req), std::move(opts)); + } + auto remote_dispatch(req_t request, model::node_id leader_id); + + template + requires requires( + datalake::coordinator::frontend f, const model::ntp& ntp, req_t req) { + (f.*LocalFunc)(std::move(req), ntp, ss::shard_id{0}); + } + auto process(req_t req, bool local_only); + + ss::future ensure_topic_exists(); + + std::optional + coordinator_partition(const model::topic_partition&) const; + + ss::future + add_translated_data_files_locally( + add_translated_data_files_request, + const model::ntp& coordinator_partition, + ss::shard_id); + + ss::future fetch_latest_data_file_locally( + fetch_latest_data_file_request, + const model::ntp& coordinator_partition, + ss::shard_id); + + model::node_id _self; + ss::sharded* _group_mgr; + ss::sharded* _partition_mgr; + ss::sharded* _topics_frontend; + ss::sharded* _metadata; + ss::sharded* _leaders; + ss::sharded* _shard_table; + ss::sharded<::rpc::connection_cache>* _connection_cache; + ss::gate _gate; +}; +} // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/rpc.json b/src/v/datalake/coordinator/rpc.json new file mode 100644 index 000000000000..1e4d5cfc2c2e --- /dev/null +++ b/src/v/datalake/coordinator/rpc.json @@ -0,0 +1,19 @@ +{ + "namespace": "datalake::coordinator::rpc::impl", + "service_name": "datalake_coordinator_rpc", + "includes": [ + "datalake/coordinator/types.h" + ], + "methods": [ + { + "name": "add_translated_data_files", + "input_type": "add_translated_data_files_request", + "output_type": "add_translated_data_files_reply" + }, + { + "name": "fetch_latest_data_file", + "input_type": "fetch_latest_data_file_request", + "output_type": "fetch_latest_data_file_reply" + } + ] +} diff --git a/src/v/datalake/coordinator/service.cc b/src/v/datalake/coordinator/service.cc new file mode 100644 index 000000000000..5565ffe1c19e --- /dev/null +++ b/src/v/datalake/coordinator/service.cc @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/coordinator/service.h" + +#include "datalake/coordinator/frontend.h" + +namespace datalake::coordinator::rpc { + +service::service( + ss::scheduling_group sg, + ss::smp_service_group smp_sg, + ss::sharded* frontend) + : impl::datalake_coordinator_rpc_service(sg, smp_sg) + , _frontend(frontend) {} + +ss::future service::add_translated_data_files( + add_translated_data_files_request request, ::rpc::streaming_context&) { + return _frontend->local().add_translated_data_files( + std::move(request), frontend::local_only::yes); +} + +ss::future service::fetch_latest_data_file( + fetch_latest_data_file_request request, ::rpc::streaming_context&) { + return _frontend->local().fetch_latest_data_file( + std::move(request), frontend::local_only::yes); +} + +}; // namespace datalake::coordinator::rpc diff --git a/src/v/datalake/coordinator/service.h b/src/v/datalake/coordinator/service.h new file mode 100644 index 000000000000..756479e6707d --- /dev/null +++ b/src/v/datalake/coordinator/service.h @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/coordinator/rpc_service.h" +#include "datalake/fwd.h" + +namespace datalake::coordinator::rpc { +class service final : public impl::datalake_coordinator_rpc_service { +public: + service( + ss::scheduling_group, ss::smp_service_group, ss::sharded*); + + ss::future add_translated_data_files( + add_translated_data_files_request, ::rpc::streaming_context&) override; + + ss::future fetch_latest_data_file( + fetch_latest_data_file_request, ::rpc::streaming_context&) override; + +private: + ss::sharded* _frontend; +}; +} // namespace datalake::coordinator::rpc diff --git a/src/v/datalake/coordinator/state_machine.cc b/src/v/datalake/coordinator/state_machine.cc new file mode 100644 index 000000000000..ed6e8e1fbf5a --- /dev/null +++ b/src/v/datalake/coordinator/state_machine.cc @@ -0,0 +1,65 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/coordinator/state_machine.h" + +#include "datalake/logger.h" + +namespace datalake::coordinator { + +coordinator_stm::coordinator_stm(ss::logger& logger, raft::consensus* raft) + : raft::persisted_stm<>("datalake_coordinator_stm.snapshot", logger, raft) {} + +ss::future<> coordinator_stm::do_apply(const model::record_batch&) { + co_return; +} + +model::offset coordinator_stm::max_collectible_offset() { return {}; } + +ss::future<> +coordinator_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) { + co_return; +} + +ss::future +coordinator_stm::take_local_snapshot(ssx::semaphore_units) { + return ss::make_exception_future( + std::runtime_error{"not implemented exception"}); +} + +ss::future<> coordinator_stm::apply_raft_snapshot(const iobuf&) { co_return; } + +ss::future coordinator_stm::take_snapshot(model::offset) { + co_return iobuf{}; +} + +ss::future +coordinator_stm::add_translated_data_file(add_translated_data_files_request) { + co_return add_translated_data_files_reply{coordinator_errc::ok}; +} + +ss::future +coordinator_stm::fetch_latest_data_file(fetch_latest_data_file_request) { + co_return fetch_latest_data_file_reply{coordinator_errc::ok}; +} + +bool stm_factory::is_applicable_for(const storage::ntp_config& config) const { + const auto& ntp = config.ntp(); + return (ntp.ns == model::datalake_coordinator_nt.ns) + && (ntp.tp.topic == model::datalake_coordinator_topic); +} + +void stm_factory::create( + raft::state_machine_manager_builder& builder, raft::consensus* raft) { + auto stm = builder.create_stm(datalake_log, raft); + raft->log()->stm_manager()->add_stm(stm); +} + +} // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/state_machine.h b/src/v/datalake/coordinator/state_machine.h new file mode 100644 index 000000000000..de1dcee82b63 --- /dev/null +++ b/src/v/datalake/coordinator/state_machine.h @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once +#include "cluster/state_machine_registry.h" +#include "datalake/coordinator/types.h" +#include "raft/persisted_stm.h" + +namespace datalake::coordinator { + +class coordinator_stm final : public raft::persisted_stm<> { +public: + static constexpr std::string_view name = "datalake_coordinator_stm"; + + explicit coordinator_stm(ss::logger&, raft::consensus*); + + ss::future<> do_apply(const model::record_batch&) override; + + model::offset max_collectible_offset() override; + + ss::future<> + apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override; + + ss::future + take_local_snapshot(ssx::semaphore_units) override; + + ss::future<> apply_raft_snapshot(const iobuf&) final; + + ss::future take_snapshot(model::offset) final; + + ss::future + add_translated_data_file(add_translated_data_files_request); + + ss::future + fetch_latest_data_file(fetch_latest_data_file_request); + +private: +}; +class stm_factory : public cluster::state_machine_factory { +public: + stm_factory() = default; + bool is_applicable_for(const storage::ntp_config&) const final; + void create(raft::state_machine_manager_builder&, raft::consensus*) final; +}; +} // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h new file mode 100644 index 000000000000..a12918ab1def --- /dev/null +++ b/src/v/datalake/coordinator/types.h @@ -0,0 +1,112 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "container/fragmented_vector.h" +#include "datalake/data_writer_interface.h" +#include "datalake/errors.h" +#include "model/fundamental.h" +#include "serde/rw/enum.h" +#include "serde/rw/envelope.h" + +namespace datalake::coordinator { + +struct translated_data_file_entry + : serde::envelope< + translated_data_file_entry, + serde::version<0>, + serde::compat_version<0>> { + model::topic_partition tp; + // inclusive offset range + model::offset begin_offset; + model::offset end_offset; + // term of the leader that performed this + // translation + model::term_id translator_term; + + data_writer_result translation_result; + + auto serde_fields() { + return std::tie( + tp, begin_offset, end_offset, translator_term, translation_result); + } +}; + +struct add_translated_data_files_reply + : serde::envelope< + add_translated_data_files_reply, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + add_translated_data_files_reply() = default; + explicit add_translated_data_files_reply(coordinator_errc err) + : errc(err) {} + + coordinator_errc errc; + + auto serde_fields() { return std::tie(errc); } +}; +struct add_translated_data_files_request + : serde::envelope< + add_translated_data_files_request, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + using resp_t = add_translated_data_files_reply; + + add_translated_data_files_request() = default; + + model::topic_partition tp; + chunked_vector files; + model::term_id translator_term; + + const model::topic_partition& topic_partition() const { return tp; } + + auto serde_fields() { return std::tie(tp, files, translator_term); } +}; + +struct fetch_latest_data_file_reply + : serde::envelope< + fetch_latest_data_file_reply, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + fetch_latest_data_file_reply() = default; + explicit fetch_latest_data_file_reply(coordinator_errc err) + : errc(err) {} + + std::optional entry; + + // If not ok, the request processing has a problem. + coordinator_errc errc; + + auto serde_fields() { return std::tie(entry, errc); } +}; + +struct fetch_latest_data_file_request + : serde::envelope< + fetch_latest_data_file_request, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + using resp_t = fetch_latest_data_file_reply; + + fetch_latest_data_file_request() = default; + + model::topic_partition tp; + + const model::topic_partition& topic_partition() const { return tp; } + + auto serde_fields() { return std::tie(tp); } +}; + +} // namespace datalake::coordinator diff --git a/src/v/datalake/data_writer_interface.h b/src/v/datalake/data_writer_interface.h index e48f1252adca..13fdd82aaf50 100644 --- a/src/v/datalake/data_writer_interface.h +++ b/src/v/datalake/data_writer_interface.h @@ -10,6 +10,7 @@ #include "datalake/schemaless_translator.h" #include "iceberg/datatypes.h" #include "iceberg/values.h" +#include "serde/envelope.h" #include #include @@ -17,8 +18,14 @@ #pragma once namespace datalake { -struct data_writer_result { +struct data_writer_result + : serde::envelope< + data_writer_result, + serde::version<0>, + serde::compat_version<0>> { size_t row_count = 0; + + auto serde_fields() { return std::tie(row_count); } }; class data_writer { diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc new file mode 100644 index 000000000000..1cd4b1728685 --- /dev/null +++ b/src/v/datalake/datalake_manager.cc @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/datalake_manager.h" + +#include "cluster/partition_manager.h" +#include "datalake/coordinator/frontend.h" +#include "datalake/logger.h" +#include "raft/group_manager.h" + +#include + +namespace datalake { + +datalake_manager::datalake_manager( + model::node_id self, + ss::sharded* group_mgr, + ss::sharded* partition_mgr, + ss::sharded* topic_table, + ss::sharded* topics_frontend, + ss::sharded* leaders, + ss::sharded* shards, + ss::sharded* frontend, + ss::sharded* as, + ss::scheduling_group sg, + [[maybe_unused]] size_t memory_limit) + : _self(self) + , _group_mgr(group_mgr) + , _partition_mgr(partition_mgr) + , _topic_table(topic_table) + , _topics_frontend(topics_frontend) + , _leaders(leaders) + , _shards(shards) + , _coordinator_frontend(frontend) + , _as(as) + , _sg(sg) {} + +ss::future<> datalake_manager::stop() { co_await _gate.close(); } + +} // namespace datalake diff --git a/src/v/datalake/datalake_manager.h b/src/v/datalake/datalake_manager.h new file mode 100644 index 000000000000..efc99cf45a4c --- /dev/null +++ b/src/v/datalake/datalake_manager.h @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "base/seastarx.h" +#include "base/units.h" +#include "cluster/fwd.h" +#include "datalake/fwd.h" +#include "raft/fundamental.h" +#include "raft/fwd.h" +#include "ssx/semaphore.h" + +#include +#include +#include +#include + +namespace datalake { + +/* + * Per shard instance responsible for launching and synchronizing all datalake + * related tasks like file format translation, frontend etc. + */ +class datalake_manager : public ss::peering_sharded_service { +public: + datalake_manager( + model::node_id self, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::sharded*, + ss::scheduling_group sg, + size_t memory_limit); + + ss::future<> stop(); + +private: + ss::future<> stop_translator(raft::group_id); + model::node_id _self; + ss::sharded* _group_mgr; + ss::sharded* _partition_mgr; + ss::sharded* _topic_table; + ss::sharded* _topics_frontend; + ss::sharded* _leaders; + ss::sharded* _shards; + ss::sharded* _coordinator_frontend; + ss::sharded* _as; + ss::scheduling_group _sg; + ss::gate _gate; +}; + +} // namespace datalake diff --git a/src/v/datalake/errors.h b/src/v/datalake/errors.h index 2a67ceec281a..0d42eb1ebf10 100644 --- a/src/v/datalake/errors.h +++ b/src/v/datalake/errors.h @@ -12,6 +12,16 @@ #include namespace datalake { +enum class coordinator_errc : int16_t { + ok, + coordinator_topic_not_exists, + not_leader, + timeout, + fenced, + stale, + concurrent_requests, +}; + // TODO: Make an std::error_category instance for this enum class arrow_converter_status { ok, diff --git a/src/v/datalake/fwd.h b/src/v/datalake/fwd.h new file mode 100644 index 000000000000..1a80edb6cc28 --- /dev/null +++ b/src/v/datalake/fwd.h @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +namespace datalake { +namespace coordinator { +class frontend; +}; +class datalake_manager; +} // namespace datalake diff --git a/src/v/model/namespace.h b/src/v/model/namespace.h index ced8e1e4818e..db0d7f756a5f 100644 --- a/src/v/model/namespace.h +++ b/src/v/model/namespace.h @@ -80,6 +80,10 @@ inline const model::topic inline const model::topic_namespace transform_log_internal_nt( model::kafka_namespace, model::transform_log_internal_topic); +inline const model::topic datalake_coordinator_topic("datalake_coordinator"); +inline const model::topic_namespace datalake_coordinator_nt( + model::kafka_internal_namespace, model::datalake_coordinator_topic); + inline bool is_user_topic(topic_namespace_view tp_ns) { return tp_ns.ns == kafka_namespace && tp_ns.tp != kafka_consumer_offsets_topic diff --git a/src/v/redpanda/BUILD b/src/v/redpanda/BUILD index 97e4b53e06db..22b969768a53 100644 --- a/src/v/redpanda/BUILD +++ b/src/v/redpanda/BUILD @@ -25,6 +25,10 @@ redpanda_cc_library( "//src/v/compression", "//src/v/config", "//src/v/crypto", + "//src/v/datalake:manager", + "//src/v/datalake:types", + "//src/v/datalake/coordinator:frontend", + "//src/v/datalake/coordinator:stm", "//src/v/debug_bundle", "//src/v/features", "//src/v/finjector", diff --git a/src/v/redpanda/CMakeLists.txt b/src/v/redpanda/CMakeLists.txt index 8e954223122e..df954b5061a9 100644 --- a/src/v/redpanda/CMakeLists.txt +++ b/src/v/redpanda/CMakeLists.txt @@ -65,6 +65,7 @@ v_cc_library( Seastar::seastar v::crypto v::cluster + v::datalake v::debug_bundle v::kafka_recovery v::finjector diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index bff277269ff7..a68bae0f7b06 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -85,6 +85,10 @@ #include "config/seed_server.h" #include "config/types.h" #include "crypto/ossl_context_service.h" +#include "datalake/coordinator/frontend.h" +#include "datalake/coordinator/service.h" +#include "datalake/coordinator/state_machine.h" +#include "datalake/datalake_manager.h" #include "debug_bundle/debug_bundle_service.h" #include "features/feature_table_snapshot.h" #include "features/fwd.h" @@ -1457,6 +1461,33 @@ void application::wire_up_runtime_services( .get(); } + syschecks::systemd_message("Starting datalake services").get(); + construct_service( + _datalake_coordinator_fe, + node_id, + &raft_group_manager, + &partition_manager, + &controller->get_topics_frontend(), + &metadata_cache, + &controller->get_partition_leaders(), + &controller->get_shard_table()) + .get(); + + construct_service( + _datalake_manager, + node_id, + &raft_group_manager, + &partition_manager, + &controller->get_topics_state(), + &controller->get_topics_frontend(), + &controller->get_partition_leaders(), + &controller->get_shard_table(), + &_datalake_coordinator_fe, + &_as, + sched_groups.datalake_sg(), + memory_groups().datalake_max_memory()) + .get(); + construct_single_service(_monitor_unsafe, std::ref(feature_table)); construct_service(_debug_bundle_service, &storage.local().kvs()).get(); @@ -2915,6 +2946,7 @@ void application::start_runtime_services( pm.register_factory( storage.local().kvs(), config::shard_local_cfg().rm_sync_timeout_ms.bind()); + pm.register_factory(); }) .get(); partition_manager.invoke_on_all(&cluster::partition_manager::start).get(); @@ -3100,6 +3132,12 @@ void application::start_runtime_services( std::ref(controller->get_data_migration_frontend()), std::ref(controller->get_data_migration_irpc_frontend()))); + runtime_services.push_back( + std::make_unique( + sched_groups.datalake_sg(), + smp_service_groups.datalake_sg(), + &_datalake_coordinator_fe)); + s.add_services(std::move(runtime_services)); // Done! Disallow unknown method errors. diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index da246a416b3f..881a2061e53b 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -28,6 +28,7 @@ #include "cluster/tx_coordinator_mapper.h" #include "config/node_config.h" #include "crypto/ossl_context_service.h" +#include "datalake/fwd.h" #include "debug_bundle/fwd.h" #include "features/fwd.h" #include "finjector/stress_fiber.h" @@ -338,6 +339,9 @@ class application { // Small helpers to execute one-time upgrade actions std::vector> _migrators; + ss::sharded _datalake_coordinator_fe; + ss::sharded _datalake_manager; + // run these first on destruction deferred_actions _deferred; diff --git a/src/v/resource_mgmt/cpu_scheduling.h b/src/v/resource_mgmt/cpu_scheduling.h index 43cbe9ec1c11..3f47de26150d 100644 --- a/src/v/resource_mgmt/cpu_scheduling.h +++ b/src/v/resource_mgmt/cpu_scheduling.h @@ -39,6 +39,7 @@ class scheduling_groups final { _self_test = co_await ss::create_scheduling_group("self_test", 100); _fetch = co_await ss::create_scheduling_group("fetch", 1000); _transforms = co_await ss::create_scheduling_group("transforms", 100); + _datalake = co_await ss::create_scheduling_group("datalake", 100); } ss::future<> destroy_groups() { @@ -54,6 +55,7 @@ class scheduling_groups final { co_await destroy_scheduling_group(_self_test); co_await destroy_scheduling_group(_fetch); co_await destroy_scheduling_group(_transforms); + co_await destroy_scheduling_group(_datalake); } ss::scheduling_group admin_sg() { return _admin; } @@ -72,6 +74,7 @@ class scheduling_groups final { ss::scheduling_group node_status() { return _node_status; } ss::scheduling_group self_test_sg() { return _self_test; } ss::scheduling_group transforms_sg() { return _transforms; } + ss::scheduling_group datalake_sg() { return _datalake; } /** * @brief Scheduling group for fetch requests. * @@ -98,7 +101,8 @@ class scheduling_groups final { std::cref(_node_status), std::cref(_self_test), std::cref(_fetch), - std::cref(_transforms)}; + std::cref(_transforms), + std::cref(_datalake)}; } private: @@ -116,4 +120,5 @@ class scheduling_groups final { ss::scheduling_group _self_test; ss::scheduling_group _fetch; ss::scheduling_group _transforms; + ss::scheduling_group _datalake; }; diff --git a/src/v/resource_mgmt/memory_groups.cc b/src/v/resource_mgmt/memory_groups.cc index 0fa9c6d46bd0..b0208d0fcf82 100644 --- a/src/v/resource_mgmt/memory_groups.cc +++ b/src/v/resource_mgmt/memory_groups.cc @@ -33,9 +33,11 @@ struct memory_shares { constexpr static size_t recovery = 1; constexpr static size_t tiered_storage = 1; constexpr static size_t data_transforms = 1; + constexpr static size_t datalake = 1; static size_t total_shares(bool with_wasm) { - size_t total = chunk_cache + kafka + rpc + recovery + tiered_storage; + size_t total = chunk_cache + kafka + rpc + recovery + tiered_storage + + datalake; if (with_wasm) { total += data_transforms; } @@ -91,6 +93,10 @@ size_t system_memory_groups::data_transforms_max_memory() const { return subsystem_memory(); } +size_t system_memory_groups::datalake_max_memory() const { + return subsystem_memory(); +} + template size_t system_memory_groups::subsystem_memory() const { size_t per_share_amount = total_memory() diff --git a/src/v/resource_mgmt/memory_groups.h b/src/v/resource_mgmt/memory_groups.h index 37d6102a1939..4b9d3de213a5 100644 --- a/src/v/resource_mgmt/memory_groups.h +++ b/src/v/resource_mgmt/memory_groups.h @@ -76,6 +76,8 @@ class system_memory_groups { return _compaction_reserved_memory; } + size_t datalake_max_memory() const; + private: /** * Total memory for a core after the user's Wasm and compaction diff --git a/src/v/resource_mgmt/smp_groups.cc b/src/v/resource_mgmt/smp_groups.cc index 53d7f1f3121a..bddf1d7608e2 100644 --- a/src/v/resource_mgmt/smp_groups.cc +++ b/src/v/resource_mgmt/smp_groups.cc @@ -24,6 +24,8 @@ ss::future<> smp_groups::create_groups(config cfg) { cfg.proxy_group_max_non_local_requests); _transform = co_await create_service_group( cfg.transform_group_max_non_local_requests); + _datalake = co_await create_service_group( + cfg.datalake_group_max_non_local_requests); } ss::future<> smp_groups::destroy_groups() { @@ -32,6 +34,7 @@ ss::future<> smp_groups::destroy_groups() { co_await destroy_smp_service_group(*_cluster); co_await destroy_smp_service_group(*_proxy); co_await destroy_smp_service_group(*_transform); + co_await destroy_smp_service_group(*_datalake); } uint32_t diff --git a/src/v/resource_mgmt/smp_groups.h b/src/v/resource_mgmt/smp_groups.h index 4139777a2380..2fa3f1084a52 100644 --- a/src/v/resource_mgmt/smp_groups.h +++ b/src/v/resource_mgmt/smp_groups.h @@ -33,6 +33,8 @@ class smp_groups { = default_max_nonlocal_requests; uint32_t transform_group_max_non_local_requests = default_max_nonlocal_requests; + uint32_t datalake_group_max_non_local_requests + = default_max_nonlocal_requests; }; smp_groups() = default; @@ -43,6 +45,7 @@ class smp_groups { ss::smp_service_group cluster_smp_sg() { return *_cluster; } ss::smp_service_group proxy_smp_sg() { return *_proxy; } ss::smp_service_group transform_smp_sg() { return *_transform; } + ss::smp_service_group datalake_sg() { return *_datalake; } ss::future<> destroy_groups(); @@ -58,4 +61,5 @@ class smp_groups { std::unique_ptr _cluster; std::unique_ptr _proxy; std::unique_ptr _transform; + std::unique_ptr _datalake; }; diff --git a/src/v/resource_mgmt/tests/memory_groups_test.cc b/src/v/resource_mgmt/tests/memory_groups_test.cc index 1a83c139239b..e79b932a4ae9 100644 --- a/src/v/resource_mgmt/tests/memory_groups_test.cc +++ b/src/v/resource_mgmt/tests/memory_groups_test.cc @@ -16,20 +16,8 @@ #include #include -TEST(MemoryGroups, HasCompatibility) { - class system_memory_groups groups( - 2_GiB, /*compaction_memory_reservation=*/{}, /*wasm_enabled=*/false); - EXPECT_THAT(groups.chunk_cache_min_memory(), 2_GiB * .1); - EXPECT_THAT(groups.tiered_storage_max_memory(), 2_GiB * .1); - EXPECT_THAT(groups.recovery_max_memory(), 2_GiB * .1); - // These round differently than the original calculation. - EXPECT_THAT(groups.chunk_cache_max_memory(), (2_GiB * .3) - 2); - EXPECT_THAT(groups.kafka_total_memory(), (2_GiB * .3) - 2); - EXPECT_THAT(groups.rpc_total_memory(), (2_GiB * .2) - 1); - EXPECT_THAT(groups.data_transforms_max_memory(), 0); - - EXPECT_EQ(0, groups.compaction_reserved_memory()); -} +static constexpr size_t total_shares_without_transforms = 11; +static constexpr size_t total_shares_with_transforms = 12; // It's not really useful to know the exact byte values for each of these // numbers so we just make sure we're within a MB @@ -41,23 +29,57 @@ MATCHER_P(IsApprox, n, "") { return arg >= low && arg <= high; } +TEST(MemoryGroups, HasCompatibility) { + class system_memory_groups groups( + 2_GiB, /*compaction_memory_reservation=*/{}, /*wasm_enabled=*/false); + auto shares = total_shares_without_transforms; + EXPECT_THAT( + groups.chunk_cache_min_memory(), IsApprox(2_GiB * 1.0 / shares)); + EXPECT_THAT( + groups.tiered_storage_max_memory(), IsApprox(2_GiB * 1.0 / shares)); + EXPECT_THAT(groups.recovery_max_memory(), IsApprox(2_GiB * 1.0 / shares)); + // These round differently than the original calculation. + EXPECT_THAT( + groups.chunk_cache_max_memory(), IsApprox(2_GiB * 3.0 / shares)); + EXPECT_THAT(groups.kafka_total_memory(), IsApprox(2_GiB * 3.0 / shares)); + EXPECT_THAT(groups.rpc_total_memory(), IsApprox(2_GiB * 2.0 / shares)); + EXPECT_THAT(groups.datalake_max_memory(), IsApprox(2_GiB * 1.0 / shares)); + EXPECT_THAT(groups.data_transforms_max_memory(), 0); + + EXPECT_EQ(0, groups.compaction_reserved_memory()); +} + TEST(MemoryGroups, DividesSharesWithWasm) { constexpr size_t user_wasm_reservation = 20_MiB; + auto total_memory = 2_GiB - user_wasm_reservation; class system_memory_groups groups( - 2_GiB - user_wasm_reservation, + total_memory, /*compaction_memory_reservation=*/{}, /*wasm_enabled=*/true); - EXPECT_THAT(groups.chunk_cache_min_memory(), IsApprox(184_MiB)); - EXPECT_THAT(groups.chunk_cache_max_memory(), IsApprox(553_MiB)); - EXPECT_THAT(groups.tiered_storage_max_memory(), IsApprox(184_MiB)); - EXPECT_THAT(groups.recovery_max_memory(), IsApprox(184_MiB)); - EXPECT_THAT(groups.kafka_total_memory(), IsApprox(553_MiB)); - EXPECT_THAT(groups.rpc_total_memory(), IsApprox(368_MiB)); - EXPECT_THAT(groups.data_transforms_max_memory(), IsApprox(184_MiB)); - EXPECT_LT( + auto shares = total_shares_with_transforms; + EXPECT_THAT( + groups.chunk_cache_min_memory(), IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.chunk_cache_max_memory(), IsApprox(total_memory * 3.0 / shares)); + EXPECT_THAT( + groups.tiered_storage_max_memory(), + IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.recovery_max_memory(), IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.kafka_total_memory(), IsApprox(total_memory * 3.0 / shares)); + EXPECT_THAT( + groups.rpc_total_memory(), IsApprox(total_memory * 2.0 / shares)); + EXPECT_THAT( + groups.data_transforms_max_memory(), + IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.datalake_max_memory(), IsApprox(total_memory * 1.0 / shares)); + EXPECT_LE( groups.data_transforms_max_memory() + groups.chunk_cache_max_memory() + groups.kafka_total_memory() + groups.recovery_max_memory() - + groups.rpc_total_memory() + groups.tiered_storage_max_memory(), + + groups.rpc_total_memory() + groups.tiered_storage_max_memory() + + groups.datalake_max_memory(), 2_GiB - user_wasm_reservation); EXPECT_EQ(0, groups.compaction_reserved_memory()); @@ -70,17 +92,31 @@ TEST(MemoryGroups, DividesSharesWithCompaction) { /*compaction_memory_reservation=*/ {.max_bytes = compaction_reserved_memory, .max_limit_pct = 100.0}, /*wasm_enabled=*/true); - EXPECT_THAT(groups.chunk_cache_min_memory(), IsApprox(184_MiB)); - EXPECT_THAT(groups.chunk_cache_max_memory(), IsApprox(553_MiB)); - EXPECT_THAT(groups.tiered_storage_max_memory(), IsApprox(184_MiB)); - EXPECT_THAT(groups.recovery_max_memory(), IsApprox(184_MiB)); - EXPECT_THAT(groups.kafka_total_memory(), IsApprox(553_MiB)); - EXPECT_THAT(groups.rpc_total_memory(), IsApprox(368_MiB)); - EXPECT_THAT(groups.data_transforms_max_memory(), IsApprox(184_MiB)); - EXPECT_LT( + auto total_memory = 2_GiB - compaction_reserved_memory; + auto shares = total_shares_with_transforms; + EXPECT_THAT( + groups.chunk_cache_min_memory(), IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.chunk_cache_max_memory(), IsApprox(total_memory * 3.0 / shares)); + EXPECT_THAT( + groups.tiered_storage_max_memory(), + IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.recovery_max_memory(), IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.kafka_total_memory(), IsApprox(total_memory * 3.0 / shares)); + EXPECT_THAT( + groups.rpc_total_memory(), IsApprox(total_memory * 2.0 / shares)); + EXPECT_THAT( + groups.data_transforms_max_memory(), + IsApprox(total_memory * 1.0 / shares)); + EXPECT_THAT( + groups.datalake_max_memory(), IsApprox(total_memory * 1.0 / shares)); + EXPECT_LE( groups.data_transforms_max_memory() + groups.chunk_cache_max_memory() + groups.kafka_total_memory() + groups.recovery_max_memory() - + groups.rpc_total_memory() + groups.tiered_storage_max_memory(), + + groups.rpc_total_memory() + groups.tiered_storage_max_memory() + + groups.datalake_max_memory(), 2_GiB - compaction_reserved_memory); EXPECT_EQ(compaction_reserved_memory, groups.compaction_reserved_memory());