Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage_e2e_single_threaded test in bazel #23875

Closed
wants to merge 69 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
a3c0c7d
datalake/proto: improved error messages for recursive types
mmaslankaprv Oct 10, 2024
a4d8711
dl/coordinator: ostream for state_update
andrwng Oct 8, 2024
0fc33ae
dl/coordinator: return message on bad state update
andrwng Oct 9, 2024
d95ff83
dl/coordinator/tests: state_test_utils
andrwng Oct 9, 2024
ea1735c
model: datalake_coordinator batch type
andrwng Oct 8, 2024
a143bad
dl/coordinator: apply datalake updates to stm
andrwng Oct 7, 2024
75014c8
dl/coordinator: scope down STM public interface
andrwng Oct 8, 2024
5463ba6
dl/coordinator: add initial coordinator
andrwng Oct 8, 2024
cb1f29e
kafka: hide supported properties list
dotnwat Oct 9, 2024
8cc13a5
kafka: add cloud topics development topic property
dotnwat Oct 9, 2024
482709e
ct: do not allow cloud topics property to change
dotnwat Oct 9, 2024
87bc4aa
ct: report cloud topics property
dotnwat Oct 9, 2024
f22d183
ct: wire up to topic property
dotnwat Oct 7, 2024
301f69e
chore: apply clang format
dotnwat Oct 9, 2024
5c8ff5a
iceberg/rest-client: Add oauth token parser
abhijat Sep 17, 2024
1a1f927
http: Make http-client abstract
abhijat Sep 17, 2024
db9d8ec
iceberg/rest-client: Add a retry policy
abhijat Sep 25, 2024
aaea11f
iceberg/rest-client: Add basic catalog client
abhijat Sep 25, 2024
e29bf11
tests/polaris: fixed polaris catalog api
mmaslankaprv Oct 3, 2024
0e2e466
tests: add pyiceberg to ducktape dependencies
mmaslankaprv Oct 3, 2024
12a8a55
tests: added test being an example of accessing polaris catalog
mmaslankaprv Oct 3, 2024
65d0185
tests/setup: applied formatting
mmaslankaprv Oct 3, 2024
9efc4cd
c/producer_state: keep producer inflight requests queue bounded
mmaslankaprv Oct 9, 2024
2c289ca
tests: added test validating idemotent producers with write caching
mmaslankaprv Oct 9, 2024
13618d6
tests: move rpk connect execution to rpk_remote
r-vasquez Oct 9, 2024
9bdfa5b
cluster: mark leadership pinning as an enterprise feature
ztlpn Oct 11, 2024
3346df7
tests: add license check test case for leadership pinning
ztlpn Oct 11, 2024
b070879
tx/group: consolidate validation checks in do_commit()
bharathv Oct 9, 2024
3d286f1
tx/group: consolidate validation checks in do_abort()
bharathv Oct 9, 2024
fa087e4
tx/group: ignore replay requests if producer not found
bharathv Oct 9, 2024
0ad76b2
debug_bundle: Validating no control characters
michael-redpanda Oct 11, 2024
83e1156
debug_bundle: Removed k8s namespace checks
michael-redpanda Oct 11, 2024
1386292
debug_bundle: Added label_select option
michael-redpanda Oct 9, 2024
8587175
datalake: batching_parquet_writer factory and tests
jcipar Oct 8, 2024
bd1f636
datalake: error logging in batching_parquet_writer
jcipar Oct 8, 2024
e44e842
datalake: use result type for data_writer_factory::create
jcipar Oct 8, 2024
d4f14bf
datalake: multiplexer handles failure to create writer
jcipar Oct 8, 2024
35153cb
datalake: use coordinator::data_file in parquet writer
jcipar Oct 9, 2024
f33b403
datalake: modify multiplexer to return translated_offset_range
jcipar Oct 9, 2024
61306d8
datalake: suppress alignment errors from Arrow
jcipar Oct 10, 2024
7352d3b
bazel: update seastar repository to use v24.3.x branch
ballard26 Oct 9, 2024
5f4d29b
cmake: update seastar to use v24.3.x branch
ballard26 Oct 9, 2024
c313a53
bazel: Use upstream unordered_dense
StephanDollberg Oct 9, 2024
8f88a51
ct: Add basic_cache_service_api interface
Lazin Oct 2, 2024
b4a2612
ct: Derive 'cloud_storage::cache' from the i-face
Lazin Oct 2, 2024
4a9c6d6
`cluster`: alter log line in `maybe_download_log()`
WillemKauf Oct 10, 2024
b9c59da
dl/coordinator: add coordinator manager
andrwng Oct 10, 2024
3980de2
dl/coordinator: use STM shared ptr in coordinator
andrwng Oct 10, 2024
842b6b8
dl/coordinator: use coordinator_manager in frontend
andrwng Oct 10, 2024
5964ec1
bazel: update lock
dotnwat Oct 11, 2024
b387751
bazel: allow redpanda library to have tags
dotnwat Oct 11, 2024
7cd3590
chore: fix various dependency issues
dotnwat Oct 7, 2024
6735f93
cluster: fix unused variable,function warnings
dotnwat Oct 11, 2024
0c4a8c4
clang-tidy: exclude bazel external directory
dotnwat Oct 11, 2024
23ff08b
iceberg: ostream operators for catalog types
andrwng Oct 11, 2024
0459783
datalake: make separate header for common table metadata
andrwng Oct 8, 2024
1aa91b2
dl/coordinator: add file committer
andrwng Oct 11, 2024
ae2bd8e
`cst`: add `initial_revision_id` to `topic_mount_manifest`
WillemKauf Oct 10, 2024
80e287a
`cst`: check for existing manifest during unmount
WillemKauf Oct 10, 2024
6ba9b34
bazel: upgrade boost
dotnwat Oct 13, 2024
bab706e
bazel: add patch for boost deps fix
dotnwat Oct 13, 2024
d34152f
bazel: add missing dependencies
dotnwat Oct 13, 2024
6bd4480
bazel: upgrade boost to incorporate patch
dotnwat Oct 13, 2024
eaf0797
debug_bundle: Added support for metrics-samples
michael-redpanda Oct 10, 2024
4b5df4a
datalake/coordinator/mgr: wire up missing start invocation
bharathv Oct 12, 2024
57b50fa
partition_mgr/notification: notify after start
bharathv Oct 12, 2024
e6c2d9e
storage/tests: add bazel test libraries
travisdowns Oct 22, 2024
919c8ba
storage/tests/e2e: fix compile errors
travisdowns Oct 22, 2024
ae71fee
storage/tests/e2e: add test to bazel
travisdowns Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
Checks: 'clang-diagnostic-*,clang-analyzer-*,cert-*,cppcoreguidelines-*,hicpp-*,modernize-*,performance-*,misc-*,bugprone-*,-modernize-use-trailing-return-type,-modernize-use-nodiscard,-hicpp-named-parameter,-misc-include-cleaner,-clang-analyzer-optin.core.EnumCastOutOfRange'
WarningsAsErrors: ''
HeaderFilterRegex: '.*'
HeaderFilterRegex: '^(?!external/.*).*'
FormatStyle: file
CheckOptions:
- key: cert-dcl16-c.NewSuffixes
Expand Down
6 changes: 3 additions & 3 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ git_override(
bazel_dep(name = "rules_boost", repo_name = "com_github_nelhage_rules_boost")
archive_override(
module_name = "rules_boost",
integrity = "sha256-Sk6Rjh7r4pUyRYQ4QO2qhgZYvdkWbyrLJTCl9oY6Z7I=",
strip_prefix = "rules_boost-21cd44506bd15e06db89aa2866b9be90002d8eec",
urls = "https://github.com/nelhage/rules_boost/archive/21cd44506bd15e06db89aa2866b9be90002d8eec.tar.gz",
integrity = "sha256-ICuTlyfxD3u1tr12YyeT5qPcA/ZkEUvtMDA2ZRzz1zc=",
strip_prefix = "rules_boost-e23cc59f87d5049618472d6ce0ca0ed5ef0c23dc",
urls = "https://github.com/nelhage/rules_boost/archive/e23cc59f87d5049618472d6ce0ca0ed5ef0c23dc.tar.gz",
)

non_module_boost_repositories = use_extension("@com_github_nelhage_rules_boost//:boost/repositories.bzl", "non_module_dependencies")
Expand Down
19 changes: 9 additions & 10 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion bazel/build.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def redpanda_cc_library(
visibility = None,
include_prefix = None,
copts = [],
deps = []):
deps = [],
tags = []):
"""
Define a Redpanda C++ library.
"""
Expand All @@ -36,6 +37,7 @@ def redpanda_cc_library(
implementation_deps = implementation_deps,
deps = deps,
copts = redpanda_copts() + copts,
tags = tags,
features = [
"layering_check",
],
Expand Down
12 changes: 6 additions & 6 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,19 @@ def data_dependency():
http_archive(
name = "seastar",
build_file = "//bazel/thirdparty:seastar.BUILD",
sha256 = "eefe8af689aac287971738e4d97ed495c7a285a7fe18b1aa8bbbfe025b528e6c",
strip_prefix = "seastar-c21f48b4aced07a80308215f7d801503a223d9db",
url = "https://github.com/redpanda-data/seastar/archive/c21f48b4aced07a80308215f7d801503a223d9db.tar.gz",
sha256 = "948e5f8ad769114a4721c81d5829f675ac51482e4a9a106d57ab313104308666",
strip_prefix = "seastar-4350d7960ec5ac8a1cf9274316c8f1ab1896d24e",
url = "https://github.com/redpanda-data/seastar/archive/4350d7960ec5ac8a1cf9274316c8f1ab1896d24e.tar.gz",
patches = ["//bazel/thirdparty:seastar-fortify-source.patch"],
patch_args = ["-p1"],
)

http_archive(
name = "unordered_dense",
build_file = "//bazel/thirdparty:unordered_dense.BUILD",
sha256 = "98c9d02ff8761d50a2cb6ebd53f78f7d311f6980aef509efdcdaa5f3868ca06c",
strip_prefix = "unordered_dense-9338f301522a965309ecec58ce61f54a52fb5c22",
url = "https://github.com/redpanda-data/unordered_dense/archive/9338f301522a965309ecec58ce61f54a52fb5c22.tar.gz",
sha256 = "8393d08b2a41949c70345926515036df55643e80118b608bcec6f4202d4a3026",
strip_prefix = "unordered_dense-f30ed41b58af8c79788e8581fe57a6faf856258e",
url = "https://github.com/martinus/unordered_dense/archive/f30ed41b58af8c79788e8581fe57a6faf856258e.tar.gz",
)

http_archive(
Expand Down
2 changes: 1 addition & 1 deletion cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ set(CMAKE_CXX_STANDARD
"C++ standard to build with.")
fetch_dep(seastar
REPO https://github.com/redpanda-data/seastar.git
TAG v24.2.x
TAG v24.3.x
PATCH_COMMAND sed -i "s/add_subdirectory (tests/# add_subdirectory (tests/g" CMakeLists.txt)

fetch_dep(avro
Expand Down
16 changes: 16 additions & 0 deletions src/v/cloud_io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,19 @@ redpanda_cc_library(
"@seastar",
],
)

redpanda_cc_library(
name = "basic_cache_service_api",
srcs = [
"basic_cache_service_api.cc",
],
hdrs = [
"basic_cache_service_api.h",
],
include_prefix = "cloud_io",
visibility = ["//visibility:public"],
deps = [
"//src/v/base",
"@seastar",
],
)
1 change: 1 addition & 0 deletions src/v/cloud_io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ v_cc_library(
NAME cloud_io
SRCS
auth_refresh_bg_op.cc
basic_cache_service_api.cc
io_resources.cc
io_result.cc
remote.cc
Expand Down
59 changes: 59 additions & 0 deletions src/v/cloud_io/basic_cache_service_api.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_io/basic_cache_service_api.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/manual_clock.hh>

namespace cloud_io {

std::ostream& operator<<(std::ostream& o, cache_element_status s) {
switch (s) {
case cache_element_status::available:
o << "cache_element_available";
break;
case cache_element_status::not_available:
o << "cache_element_not_available";
break;
case cache_element_status::in_progress:
o << "cache_element_in_progress";
break;
}
return o;
}

template<class Clock>
void basic_space_reservation_guard<Clock>::wrote_data(
uint64_t written_bytes, size_t written_objects) {
// Release the reservation, and update usage stats for how much we actually
// wrote.
_cache.reserve_space_release(
_bytes, _objects, written_bytes, written_objects);

// This reservation is now used up.
_bytes = 0;
_objects = 0;
}

template<class Clock>
basic_space_reservation_guard<Clock>::~basic_space_reservation_guard() {
if (_bytes || _objects) {
// This is the case of a failed write, where wrote_data was never
// called: release the reservation and do not acquire any space
// usage for the written data (there should be none).
_cache.reserve_space_release(_bytes, _objects, 0, 0);
}
}

template class basic_space_reservation_guard<ss::lowres_clock>;
template class basic_space_reservation_guard<ss::manual_clock>;

} // namespace cloud_io
159 changes: 159 additions & 0 deletions src/v/cloud_io/basic_cache_service_api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2021 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "base/seastarx.h"
#include "base/units.h"

#include <seastar/core/file.hh>
#include <seastar/core/future.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/lowres_clock.hh>

#include <filesystem>
#include <iterator>
#include <optional>
#include <set>
#include <string_view>

namespace cloud_io {

static constexpr size_t default_write_buffer_size = 128_KiB;
static constexpr unsigned default_write_behind = 10;
static constexpr size_t default_read_buffer_size = 128_KiB;
static constexpr unsigned default_read_ahead = 10;

struct [[nodiscard]] cache_item {
ss::file body;
size_t size;
};

struct [[nodiscard]] cache_item_str {
ss::input_stream<char> body;
size_t size;
};

enum class [[nodiscard]] cache_element_status {
available,
not_available,
in_progress,
};

std::ostream& operator<<(std::ostream& o, cache_element_status);

template<class Clock>
class basic_cache_service_api;

/// RAII guard for bytes reserved in the cache: constructed prior to a call
/// to cache::put, and may be destroyed afterwards.
template<class Clock>
class basic_space_reservation_guard {
public:
basic_space_reservation_guard(
basic_cache_service_api<Clock>& cache,
uint64_t bytes,
size_t objects) noexcept
: _cache(cache)
, _bytes(bytes)
, _objects(objects) {}

basic_space_reservation_guard(const basic_space_reservation_guard&)
= delete;
basic_space_reservation_guard() = delete;
basic_space_reservation_guard(basic_space_reservation_guard&& rhs) noexcept
: _cache(rhs._cache)
, _bytes(rhs._bytes)
, _objects(rhs._objects) {
rhs._bytes = 0;
rhs._objects = 0;
}

~basic_space_reservation_guard();

/// After completing the write operation that this space reservation
/// protected, indicate how many bytes were really written: this is used to
/// atomically update cache usage stats to free the reservation and update
/// the bytes used stats together.
///
/// May only be called once per reservation.
void wrote_data(uint64_t, size_t);

private:
basic_cache_service_api<Clock>& _cache;

// Size acquired at time of reservation
uint64_t _bytes{0};
size_t _objects{0};
};

template<class Clock = ss::lowres_clock>
class basic_cache_service_api {
public:
basic_cache_service_api() = default;
basic_cache_service_api(const basic_cache_service_api&) = default;
basic_cache_service_api& operator=(const basic_cache_service_api&)
= default;
basic_cache_service_api(basic_cache_service_api&&) = default;
basic_cache_service_api& operator=(basic_cache_service_api&&) = default;
virtual ~basic_cache_service_api() = default;

/// Get cached value as a stream if it exists
///
/// \param key is a cache key
/// \param read_buffer_size is a read buffer size for the iostream
/// \param readahead number of pages that can be read asynchronously
virtual ss::future<std::optional<cache_item_str>> get(
std::filesystem::path key,
ss::io_priority_class io_priority,
size_t read_buffer_size = default_read_buffer_size,
unsigned int read_ahead = default_read_ahead)
= 0;

/// Add new value to the cache, overwrite if it's already exist
///
/// \param key is a cache key
/// \param io_priority is an io priority of disk write operation
/// \param data is an input stream containing data
/// \param write_buffer_size is a write buffer size for disk write
/// \param write_behind number of pages that can be written asynchronously
/// \param reservation caller must have reserved cache space before
/// proceeding with put
virtual ss::future<> put(
std::filesystem::path key,
ss::input_stream<char>& data,
basic_space_reservation_guard<Clock>& reservation,
ss::io_priority_class io_priority,
size_t write_buffer_size = default_write_buffer_size,
unsigned int write_behind = default_write_behind)
= 0;

/// \brief Checks if the value is cached
///
/// \note returned value \c cache_element_status::in_progress is
/// shard-local, it indicates that the object is being written by this
/// shard. If the value is being written by another shard, this function
/// returns only committed result. The value
/// \c cache_element_status::in_progress can be used as a hint since ntp are
/// stored on the same shard most of the time.
virtual ss::future<cache_element_status>
is_cached(const std::filesystem::path& key) = 0;

// Call this before starting a download, to trim the cache if necessary
// and wait until enough free space is available.
virtual ss::future<basic_space_reservation_guard<Clock>>
reserve_space(uint64_t, size_t) = 0;

// Release capacity acquired via `reserve_space`. This spawns
// a background fiber in order to be callable from the guard destructor.
virtual void reserve_space_release(uint64_t, size_t, uint64_t, size_t) = 0;
};
} // namespace cloud_io
1 change: 1 addition & 0 deletions src/v/cloud_storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ redpanda_cc_library(
"//src/v/bytes:iostream",
"//src/v/bytes:streambuf",
"//src/v/cloud_io:auth_refresh_bg_op",
"//src/v/cloud_io:basic_cache_service_api",
"//src/v/cloud_io:io_resources",
"//src/v/cloud_io:io_result",
"//src/v/cloud_io:remote",
Expand Down
Loading