Skip to content

Commit

Permalink
tests: base id_allocator_stm_test on raft_fixture
Browse files Browse the repository at this point in the history
As part of deprecating simple_raft_fixture in favour of raft_fixture,
making id_allocator_stm_test to use the latter. For compatibility with
raft_fixture::retry_with_leader I had to change id_allocator_stm return
type to result.
  • Loading branch information
bashtanov committed May 7, 2024
1 parent 403b139 commit d946a43
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 75 deletions.
12 changes: 6 additions & 6 deletions src/v/cluster/id_allocator_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ reset_id_handler::process(ss::shard_id shard, reset_id_allocator_request req) {
}
return stm->reset_next_id(id, timeout)
.then([](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
raft::make_error_code(r.assume_error()).message());
return reset_id_allocator_reply{errc::replication_error};
}

Expand Down Expand Up @@ -159,15 +159,15 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
}
return stm->allocate_id(timeout).then(
[](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
return allocate_id_reply{r.id, errc::replication_error};
raft::make_error_code(r.assume_error()).message());
return allocate_id_reply{-1, errc::replication_error};
}

return allocate_id_reply{r.id, errc::success};
return allocate_id_reply{r.assume_value(), errc::success};
});
});
}
Expand Down
20 changes: 10 additions & 10 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,26 @@ id_allocator_stm::reset_next_id(
.with(
timeout, [this, id, timeout]() { return advance_state(id, timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
return stm_allocation_result{raft::errc::timeout};
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::advance_state(
int64_t value, model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return stm_allocation_result{raft::errc::timeout};
}
if (value < _curr_id) {
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return stm_allocation_result{_curr_id};
}
_curr_id = value;
auto success = co_await set_state(_curr_id + _batch_size, timeout);
if (!success) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return stm_allocation_result{raft::errc::timeout};
}
_curr_batch = _batch_size;
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return stm_allocation_result(_curr_id);
}

ss::future<bool> id_allocator_stm::set_state(
Expand Down Expand Up @@ -124,30 +124,30 @@ id_allocator_stm::allocate_id(model::timeout_clock::duration timeout) {
return _lock
.with(timeout, [this, timeout]() { return do_allocate_id(timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
return stm_allocation_result{raft::errc::timeout};
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::do_allocate_id(model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return stm_allocation_result{raft::errc::timeout};
}

if (_curr_batch == 0) {
_curr_id = _state;
if (!co_await set_state(_curr_id + _batch_size, timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return stm_allocation_result{raft::errc::timeout};
}
_curr_batch = _batch_size;
}

auto id = _curr_id;
int64_t id = _curr_id;

_curr_id += 1;
_curr_batch -= 1;

co_return stm_allocation_result{id, raft::errc::success};
co_return stm_allocation_result{id};
}

ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ class id_allocator_stm final : public raft::persisted_stm<> {
public:
static constexpr std::string_view name = "id_allocator_stm";

struct stm_allocation_result {
int64_t id;
raft::errc raft_status{raft::errc::success};
};
using stm_allocation_result = result<int64_t, raft::errc>;

explicit id_allocator_stm(ss::logger&, raft::consensus*);

Expand Down
24 changes: 23 additions & 1 deletion src/v/cluster/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ set(srcs
distributed_kv_stm_tests.cc
rm_stm_tests.cc
rm_stm_compatibility_test.cc
id_allocator_stm_test.cc
local_monitor_test.cc
tx_compaction_tests.cc
producer_state_tests.cc
Expand All @@ -51,6 +50,29 @@ rp_test(
)
endforeach()

set(srcs
id_allocator_stm_test.cc
)

foreach(cluster_test_src ${srcs})
get_filename_component(test_name ${cluster_test_src} NAME_WE)
rp_test(
UNIT_TEST
GTEST
BINARY_NAME ${test_name}
SOURCES ${cluster_test_src}
LIBRARIES
v::gtest_main
v::application
v::storage_test_utils
v::cluster
v::http_test_utils
v::raft
v::raft_fixture
LABELS cluster
)
endforeach()

set(srcs
manual_log_deletion_test.cc
cluster_tests.cc
Expand Down
157 changes: 103 additions & 54 deletions src/v/cluster/tests/id_allocator_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "base/outcome.h"
#include "base/vassert.h"
#include "cluster/id_allocator_stm.h"
#include "config/configuration.h"
#include "model/fundamental.h"
Expand All @@ -22,112 +23,160 @@
#include "test_utils/async.h"
#include "test_utils/fixture.h"
#include "test_utils/scoped_config.h"
#include "tests/stm_test_fixture.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/log.hh>

#include <boost/range/irange.hpp>
#include <boost/test/tools/old/interface.hpp>

#include <thread>

using namespace std::chrono_literals;

namespace cluster {
namespace {

ss::logger idstmlog{"idstm-test"};

struct id_allocator_stm_fixture : simple_raft_fixture {
void create_stm_and_start_raft() {
struct id_allocator_stm_fixture : state_machine_fixture {
ss::future<> initialize_state_machines() {
// set configuration parameters
test_local_cfg.get("id_allocator_batch_size").set_value(int16_t(1));
test_local_cfg.get("id_allocator_log_capacity").set_value(int16_t(2));
create_raft();
raft::state_machine_manager_builder stm_m_builder;

_stm = stm_m_builder.create_stm<cluster::id_allocator_stm>(
idstmlog, _raft.get(), config::shard_local_cfg());
create_nodes();
co_await start_nodes();
}

ss::future<> start_node(raft_node_instance& node) {
co_await node.initialise(all_vnodes());
raft::state_machine_manager_builder builder;
auto stm = builder.create_stm<cluster::id_allocator_stm>(
idstmlog, node.raft().get(), config::shard_local_cfg());
co_await node.start(std::move(builder));
node_stms.emplace(node.get_vnode(), std::move(stm));
}

ss::future<> start_nodes() {
co_await parallel_for_each_node(
[this](raft_node_instance& node) { return start_node(node); });
}

ss::future<> stop_and_recreate_nodes() {
absl::flat_hash_map<model::node_id, ss::sstring> data_directories;
for (auto& [id, node] : nodes()) {
data_directories[id]
= node->raft()->log()->config().base_directory();
node_stms.erase(node->get_vnode());
}

co_await ss::parallel_for_each(
std::views::keys(data_directories),
[this](model::node_id id) { return stop_node(id); });

for (auto& [id, data_dir] : data_directories) {
add_node(id, model::revision_id(0), std::move(data_dir));
}
}

ss::future<> reset(int64_t id) {
auto result = co_await retry_with_leader(
model::timeout_clock::now() + 30s,
[this, id](raft_node_instance& leader_node) {
auto stm = node_stms[leader_node.get_vnode()];
return stm->reset_next_id(id, 1s);
});
vassert(result.has_value(), "raft error");
vassert(result.assume_value() == id, "unexpected value");
}

_raft->start(std::move(stm_m_builder)).get();
_started = true;
ss::future<int64_t> allocate1(int64_t cur_last_id) {
auto result = co_await retry_with_leader(
model::timeout_clock::now() + 30s,
[this](raft_node_instance& leader_node) {
auto stm = node_stms[leader_node.get_vnode()];
return stm->allocate_id(1s);
});
vassert(result.has_value(), "raft error");
vassert(result.assume_value() > cur_last_id, "unexpected value");
cur_last_id = result.assume_value();
co_return cur_last_id;
}

// Allocates n IDs, ensuring that each one new ID is greater than the
// previous one, starting with 'cur_last_id'.
// Returns the last allocated ID.
int64_t allocate_n(int64_t cur_last_id, int n) {
// One by one to be able to inject raft quirks in future.
ss::future<int64_t> allocate_n(int64_t cur_last_id, int n) {
for (int i = 0; i < n; i++) {
auto result = _stm->allocate_id(1s).get0();

BOOST_REQUIRE_EQUAL(raft::errc::success, result.raft_status);
BOOST_REQUIRE_LT(cur_last_id, result.id);

cur_last_id = result.id;
cur_last_id = co_await allocate1(cur_last_id);
}
return cur_last_id;
co_return cur_last_id;
}

ss::shared_ptr<cluster::id_allocator_stm> _stm;
absl::flat_hash_map<raft::vnode, ss::shared_ptr<cluster::id_allocator_stm>>
node_stms;
scoped_config test_local_cfg;
};

FIXTURE_TEST(stm_monotonicity_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
} // namespace

TEST_F_CORO(id_allocator_stm_fixture, stm_monotonicity_test) {
co_await initialize_state_machines();

int64_t last_id = -1;
allocate_n(last_id, 5);
co_await allocate_n(last_id, 5);
}

FIXTURE_TEST(stm_restart_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
TEST_F_CORO(id_allocator_stm_fixture, stm_restart_test) {
co_await initialize_state_machines();

int64_t last_id = -1;

last_id = allocate_n(last_id, 5);
stop_all();
create_stm_and_start_raft();
wait_for_confirmed_leader();

allocate_n(last_id, 5);
last_id = co_await allocate_n(last_id, 5);
co_await stop_and_recreate_nodes();
co_await start_nodes();
co_await allocate_n(last_id, 5);
}

FIXTURE_TEST(stm_reset_id_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
TEST_F_CORO(id_allocator_stm_fixture, stm_reset_id_test) {
co_await initialize_state_machines();

int64_t last_id = -1;
allocate_n(last_id, 5);
co_await allocate_n(last_id, 5);

// Reset to 100.
last_id = 100;
_stm->reset_next_id(last_id + 1, 1s).get();
last_id = allocate_n(last_id, 5);
BOOST_REQUIRE_EQUAL(last_id, 105);
co_await reset(last_id + 1);
last_id = co_await allocate_n(last_id, 5);
ASSERT_EQ_CORO(last_id, 105);

// Even after restarting, the starting point should be where we left off.
stop_all();
create_stm_and_start_raft();
wait_for_confirmed_leader();
co_await stop_and_recreate_nodes();
co_await start_nodes();

last_id = allocate_n(last_id, 5);
BOOST_REQUIRE_EQUAL(last_id, 110);
last_id = co_await allocate_n(last_id, 5);
ASSERT_EQ_CORO(last_id, 110);
}

FIXTURE_TEST(stm_reset_batch_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
TEST_F_CORO(id_allocator_stm_fixture, stm_reset_batch_test) {
co_await initialize_state_machines();
int64_t last_id = -1;
allocate_n(last_id, 5);
co_await allocate_n(last_id, 5);

last_id = 100;
_stm->reset_next_id(last_id + 1, 1s).get();
co_await reset(last_id + 1);

// After a leadership change, the reset should still take effect. However,
// it should be offset by one batch.
_raft->step_down("test").get();
wait_for_confirmed_leader();
last_id = allocate_n(last_id, 1);
BOOST_REQUIRE_EQUAL(last_id, 102);
co_await with_leader(1s, [](raft_node_instance& leader_node) {
return leader_node.raft()->step_down("test");
});

last_id = co_await allocate_n(last_id, 1);
vassert(last_id == 102, "unexpected value");
}

} // namespace cluster

0 comments on commit d946a43

Please sign in to comment.