Skip to content

Commit

Permalink
Merge pull request #23822 from mmaslankaprv/datalake-mux-2
Browse files Browse the repository at this point in the history
Loosen coupling of `datalake::record_multiplexer`
  • Loading branch information
mmaslankaprv authored Oct 17, 2024
2 parents 640c9ab + 24742dd commit 849a5f0
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 92 deletions.
17 changes: 17 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ redpanda_cc_library(
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
":base_types",
":table_definition",
"//src/v/base",
"//src/v/datalake/coordinator:data_file",
Expand Down Expand Up @@ -161,3 +162,19 @@ redpanda_cc_library(
"@seastar",
],
)

redpanda_cc_library(
name = "base_types",
srcs = [
"base_types.cc",
],
hdrs = [
"base_types.h",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
"//src/v/utils:named_type",
"@fmt",
],
)
1 change: 1 addition & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ v_cc_library(
schema_protobuf.cc
protobuf_utils.cc
values_protobuf.cc
base_types.cc
DEPS
v::datalake_common
v::datalake_coordinator
Expand Down
24 changes: 24 additions & 0 deletions src/v/datalake/base_types.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/base_types.h"

#include <fmt/core.h>
namespace datalake {
std::ostream& operator<<(std::ostream& o, const local_file_metadata& f_meta) {
fmt::print(
o,
"{{relative_path: {}, size_bytes: {}, row_count: {}, hour: {}}}",
f_meta.path,
f_meta.size_bytes,
f_meta.row_count,
f_meta.hour);
return o;
}
} // namespace datalake
36 changes: 36 additions & 0 deletions src/v/datalake/base_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once
#include "utils/named_type.h"

#include <filesystem>
namespace datalake {
/**
* Definitions of local and remote paths, as the name indicates the local path
* is always pointing to the location on local disk wheras the remote path is a
* path of the object in the object store.
*/
using local_path = named_type<std::filesystem::path, struct local_path_tag>;
using remote_path = named_type<std::filesystem::path, struct remote_path_tag>;

/**
* Simple type describing local parquet file metadata with its path and basic
* statistics
*/
struct local_file_metadata {
local_path path;
size_t row_count = 0;
size_t size_bytes = 0;
int hour = 0;

friend std::ostream&
operator<<(std::ostream& o, const local_file_metadata& r);
};
} // namespace datalake
4 changes: 2 additions & 2 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ local_path batching_parquet_writer_factory::create_filename() const {
_base_directory()
/ fmt::format("{}-{}.parquet", _file_name_prefix, uuid_t::create())};
}
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
auto ret = ss::make_shared<batching_parquet_writer>(
auto ret = std::make_unique<batching_parquet_writer>(
std::move(schema),
_row_count_threshold,
_byte_count_threshold,
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class batching_parquet_writer_factory : public data_writer_factory {
size_t row_count_threshold,
size_t byte_count_threshold);

ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type schema) override;

private:
Expand Down
10 changes: 0 additions & 10 deletions src/v/datalake/data_writer_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,4 @@ std::string data_writer_error_category::message(int ev) const {
}
}

std::ostream& operator<<(std::ostream& o, const local_file_metadata& f_meta) {
fmt::print(
o,
"{{relative_path: {}, size_bytes: {}, row_count: {}, hour: {}}}",
f_meta.path,
f_meta.size_bytes,
f_meta.row_count,
f_meta.hour);
return o;
}
} // namespace datalake
24 changes: 2 additions & 22 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,13 @@
#pragma once

#include "base/outcome.h"
#include "datalake/base_types.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"

#include <cstddef>

namespace datalake {
/**
* Definitions of local and remote paths, as the name indicates the local path
* is always pointing to the location on local disk wheras the remote path is a
* path of the object in the object store.
*/
using local_path = named_type<std::filesystem::path, struct local_path_tag>;
using remote_path = named_type<std::filesystem::path, struct remote_path_tag>;

/**
* Simple type describing local parquet file metadata with its path and basic
* statistics
*/
struct local_file_metadata {
local_path path;
size_t row_count = 0;
size_t size_bytes = 0;
int hour = 0;

friend std::ostream&
operator<<(std::ostream& o, const local_file_metadata& r);
};

enum class data_writer_error {
ok = 0,
Expand Down Expand Up @@ -87,7 +67,7 @@ class data_writer_factory {
data_writer_factory& operator=(data_writer_factory&&) = default;
virtual ~data_writer_factory() = default;

virtual ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
virtual ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type /* schema */) = 0;
};

Expand Down
59 changes: 22 additions & 37 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@
#include "datalake/record_multiplexer.h"

#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"
#include "datalake/schemaless_translator.h"
#include "datalake/tests/test_data_writer.h"
#include "iceberg/values.h"
#include "model/record.h"
#include "storage/parser_utils.h"

#include <ios>
#include <stdexcept>
#include <system_error>

namespace datalake {
record_multiplexer::record_multiplexer(
std::unique_ptr<data_writer_factory> writer_factory)
Expand All @@ -33,47 +27,45 @@ record_multiplexer::operator()(model::record_batch batch) {
batch = co_await storage::internal::decompress_batch(std::move(batch));
}
auto first_timestamp = batch.header().first_timestamp.value();
auto base_offset = static_cast<int64_t>(batch.base_offset());

auto it = model::record_batch_iterator::create(batch);

while (it.has_next()) {
auto record = it.next();
iobuf key = record.release_key();
iobuf val = record.release_value();
// *1000: Redpanda timestamps are milliseconds. Iceberg uses
// microseconds.
int64_t timestamp = (first_timestamp + record.timestamp_delta()) * 1000;
int64_t offset = static_cast<int64_t>(base_offset)
+ record.offset_delta();
kafka::offset offset{batch.base_offset()() + record.offset_delta()};
int64_t estimated_size = key.size_bytes() + val.size_bytes() + 16;

// TODO: we want to ensure we're using an offset translating reader so
// that these will be Kafka offsets, not Raft offsets.
if (!_result.has_value()) {
_result = coordinator::translated_offset_range{};
_result.value().start_offset = kafka::offset(offset);
}
if (offset < _result.value().start_offset()) {
_result.value().start_offset = kafka::offset(offset);
}
if (offset > _result.value().start_offset()) {
_result.value().last_offset = kafka::offset(offset);
_result = write_result{
.start_offset = offset,
};
}

_result.value().last_offset = offset;

// Translate the record
auto& translator = get_translator();
iceberg::struct_value data = translator.translate_event(
std::move(key), std::move(val), timestamp, offset);
// Send it to the writer

auto writer_result = co_await get_writer();
if (!writer_result.has_value()) {
_writer_status = writer_result.error();
_error = writer_result.error();
co_return ss::stop_iteration::yes;
}
auto writer = std::move(writer_result.value());
_writer_status = co_await writer->add_data_struct(
auto& writer = writer_result.value().get();
auto write_result = co_await writer.add_data_struct(
std::move(data), estimated_size);
if (_writer_status != data_writer_error::ok) {

if (write_result != data_writer_error::ok) {
_error = write_result;
// If a write fails, the writer is left in an indeterminate state,
// we cannot continue in this case.
co_return ss::stop_iteration::yes;
Expand All @@ -82,10 +74,10 @@ record_multiplexer::operator()(model::record_batch batch) {
co_return ss::stop_iteration::no;
}

ss::future<result<coordinator::translated_offset_range, data_writer_error>>
ss::future<result<record_multiplexer::write_result, data_writer_error>>
record_multiplexer::end_of_stream() {
if (_writer_status != data_writer_error::ok) {
co_return _writer_status;
if (_error) {
co_return *_error;
}
// TODO: once we have multiple _writers this should be a loop
if (_writer) {
Expand All @@ -95,14 +87,8 @@ record_multiplexer::end_of_stream() {
auto result_files = co_await _writer->finish();
if (result_files.has_value()) {
auto local_file = result_files.value();
// TODO: upload files to cloud here, and fill necessary details
coordinator::data_file remote_file{
.remote_path = "",
.row_count = local_file.row_count,
.file_size_bytes = local_file.size_bytes,
.hour = local_file.hour,
};
_result.value().files.push_back(remote_file);

_result->data_files.push_back(local_file);
co_return std::move(_result.value());
} else {
co_return result_files.error();
Expand All @@ -116,7 +102,7 @@ schemaless_translator& record_multiplexer::get_translator() {
return _translator;
}

ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
ss::future<result<std::reference_wrapper<data_writer>, data_writer_error>>
record_multiplexer::get_writer() {
if (!_writer) {
auto& translator = get_translator();
Expand All @@ -126,9 +112,8 @@ record_multiplexer::get_writer() {
if (!writer_result.has_value()) {
co_return writer_result.error();
}
_writer = writer_result.value();
co_return _writer;
_writer = std::move(writer_result.value());
}
co_return _writer;
co_return *_writer;
}
} // namespace datalake
32 changes: 18 additions & 14 deletions src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@
*/
#pragma once

#include "base/outcome.h"
#include "container/fragmented_vector.h"
#include "datalake/coordinator/translated_offset_range.h"
#include "datalake/data_writer_interface.h"
#include "datalake/schemaless_translator.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "serde/envelope.h"

#include <seastar/core/future.hh>

Expand All @@ -40,29 +35,38 @@ for each record {
w.record(d);
}
*/
// TODO: add cleanup of files that were already written while translation
// failed.
class record_multiplexer {
public:
explicit record_multiplexer(
std::unique_ptr<data_writer_factory> writer_factory);
struct write_result {
// base offset of the first translated batch
kafka::offset start_offset;
// last offset of the last translated batch (inclusive)
kafka::offset last_offset;
// vector containing a list of files that were written during
// translation.
chunked_vector<local_file_metadata> data_files;
};
explicit record_multiplexer(std::unique_ptr<data_writer_factory> writer);

ss::future<ss::stop_iteration> operator()(model::record_batch batch);
ss::future<result<coordinator::translated_offset_range, data_writer_error>>
end_of_stream();
ss::future<result<write_result, data_writer_error>> end_of_stream();

private:
schemaless_translator& get_translator();
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
ss::future<result<std::reference_wrapper<data_writer>, data_writer_error>>
get_writer();

// TODO: in a future PR this will be a map of translators keyed by schema_id
schemaless_translator _translator;
std::unique_ptr<data_writer_factory> _writer_factory;

// TODO: similarly this will be a map keyed by schema_id
ss::shared_ptr<data_writer> _writer;

data_writer_error _writer_status = data_writer_error::ok;
std::unique_ptr<data_writer> _writer;

std::optional<coordinator::translated_offset_range> _result;
std::optional<data_writer_error> _error;
std::optional<write_result> _result;
};

} // namespace datalake
Loading

0 comments on commit 849a5f0

Please sign in to comment.