Skip to content

Commit

Permalink
tests: base id_allocator_stm_test on raft_fixture
Browse files Browse the repository at this point in the history
As part of deprecating simple_raft_fixture in favour of raft_fixture,
making id_allocator_stm_test to use the latter.
  • Loading branch information
bashtanov committed May 8, 2024
1 parent a5b75f1 commit 950e8dc
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 55 deletions.
23 changes: 22 additions & 1 deletion src/v/cluster/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ set(srcs
distributed_kv_stm_tests.cc
rm_stm_tests.cc
rm_stm_compatibility_test.cc
id_allocator_stm_test.cc
local_monitor_test.cc
tx_compaction_tests.cc
producer_state_tests.cc
Expand All @@ -51,6 +50,28 @@ rp_test(
)
endforeach()

set(srcs
id_allocator_stm_test.cc
)

foreach(cluster_test_src ${srcs})
get_filename_component(test_name ${cluster_test_src} NAME_WE)
rp_test(
UNIT_TEST
GTEST
BINARY_NAME ${test_name}
SOURCES ${cluster_test_src}
LIBRARIES
v::gtest_main
v::storage_test_utils
v::cluster
v::http_test_utils
v::raft
v::raft_fixture
LABELS cluster
)
endforeach()

set(srcs
manual_log_deletion_test.cc
cluster_tests.cc
Expand Down
157 changes: 103 additions & 54 deletions src/v/cluster/tests/id_allocator_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "base/outcome.h"
#include "base/vassert.h"
#include "cluster/id_allocator_stm.h"
#include "config/configuration.h"
#include "model/fundamental.h"
Expand All @@ -22,112 +23,160 @@
#include "test_utils/async.h"
#include "test_utils/fixture.h"
#include "test_utils/scoped_config.h"
#include "tests/stm_test_fixture.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/log.hh>

#include <boost/range/irange.hpp>
#include <boost/test/tools/old/interface.hpp>

#include <thread>

using namespace std::chrono_literals;

namespace cluster {
namespace {

ss::logger idstmlog{"idstm-test"};

struct id_allocator_stm_fixture : simple_raft_fixture {
void create_stm_and_start_raft() {
struct id_allocator_stm_fixture : state_machine_fixture {
ss::future<> initialize_state_machines() {
// set configuration parameters
test_local_cfg.get("id_allocator_batch_size").set_value(int16_t(1));
test_local_cfg.get("id_allocator_log_capacity").set_value(int16_t(2));
create_raft();
raft::state_machine_manager_builder stm_m_builder;

_stm = stm_m_builder.create_stm<cluster::id_allocator_stm>(
idstmlog, _raft.get(), config::shard_local_cfg());
create_nodes();
co_await start_nodes();
}

ss::future<> start_node(raft_node_instance& node) {
co_await node.initialise(all_vnodes());
raft::state_machine_manager_builder builder;
auto stm = builder.create_stm<cluster::id_allocator_stm>(
idstmlog, node.raft().get(), config::shard_local_cfg());
co_await node.start(std::move(builder));
node_stms.emplace(node.get_vnode(), std::move(stm));
}

ss::future<> start_nodes() {
co_await parallel_for_each_node(
[this](raft_node_instance& node) { return start_node(node); });
}

ss::future<> stop_and_recreate_nodes() {
absl::flat_hash_map<model::node_id, ss::sstring> data_directories;
for (auto& [id, node] : nodes()) {
data_directories[id]
= node->raft()->log()->config().base_directory();
node_stms.erase(node->get_vnode());
}

co_await ss::parallel_for_each(
std::views::keys(data_directories),
[this](model::node_id id) { return stop_node(id); });

for (auto& [id, data_dir] : data_directories) {
add_node(id, model::revision_id(0), std::move(data_dir));
}
}

ss::future<> reset(int64_t id) {
auto result = co_await retry_with_leader(
model::timeout_clock::now() + 30s,
[this, id](raft_node_instance& leader_node) {
auto stm = node_stms[leader_node.get_vnode()];
return stm->reset_next_id(id, 1s);
});
vassert(result.has_value(), "raft error");
vassert(result.assume_value() == id, "unexpected value");
}

_raft->start(std::move(stm_m_builder)).get();
_started = true;
ss::future<int64_t> allocate1(int64_t cur_last_id) {
auto result = co_await retry_with_leader(
model::timeout_clock::now() + 30s,
[this](raft_node_instance& leader_node) {
auto stm = node_stms[leader_node.get_vnode()];
return stm->allocate_id(1s);
});
vassert(result.has_value(), "raft error");
vassert(result.assume_value() > cur_last_id, "unexpected value");
cur_last_id = result.assume_value();
co_return cur_last_id;
}

// Allocates n IDs, ensuring that each one new ID is greater than the
// previous one, starting with 'cur_last_id'.
// Returns the last allocated ID.
int64_t allocate_n(int64_t cur_last_id, int n) {
// One by one to be able to inject raft quirks in future.
ss::future<int64_t> allocate_n(int64_t cur_last_id, int n) {
for (int i = 0; i < n; i++) {
auto result = _stm->allocate_id(1s).get0();

BOOST_REQUIRE_EQUAL(result.has_value(), true);
BOOST_REQUIRE_LT(cur_last_id, result.assume_value());

cur_last_id = result.assume_value();
cur_last_id = co_await allocate1(cur_last_id);
}
return cur_last_id;
co_return cur_last_id;
}

ss::shared_ptr<cluster::id_allocator_stm> _stm;
absl::flat_hash_map<raft::vnode, ss::shared_ptr<cluster::id_allocator_stm>>
node_stms;
scoped_config test_local_cfg;
};

FIXTURE_TEST(stm_monotonicity_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
} // namespace

TEST_F_CORO(id_allocator_stm_fixture, stm_monotonicity_test) {
co_await initialize_state_machines();

int64_t last_id = -1;
allocate_n(last_id, 5);
co_await allocate_n(last_id, 5);
}

FIXTURE_TEST(stm_restart_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
TEST_F_CORO(id_allocator_stm_fixture, stm_restart_test) {
co_await initialize_state_machines();

int64_t last_id = -1;

last_id = allocate_n(last_id, 5);
stop_all();
create_stm_and_start_raft();
wait_for_confirmed_leader();

allocate_n(last_id, 5);
last_id = co_await allocate_n(last_id, 5);
co_await stop_and_recreate_nodes();
co_await start_nodes();
co_await allocate_n(last_id, 5);
}

FIXTURE_TEST(stm_reset_id_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
TEST_F_CORO(id_allocator_stm_fixture, stm_reset_id_test) {
co_await initialize_state_machines();

int64_t last_id = -1;
allocate_n(last_id, 5);
co_await allocate_n(last_id, 5);

// Reset to 100.
last_id = 100;
_stm->reset_next_id(last_id + 1, 1s).get();
last_id = allocate_n(last_id, 5);
BOOST_REQUIRE_EQUAL(last_id, 105);
co_await reset(last_id + 1);
last_id = co_await allocate_n(last_id, 5);
ASSERT_EQ_CORO(last_id, 105);

// Even after restarting, the starting point should be where we left off.
stop_all();
create_stm_and_start_raft();
wait_for_confirmed_leader();
co_await stop_and_recreate_nodes();
co_await start_nodes();

last_id = allocate_n(last_id, 5);
BOOST_REQUIRE_EQUAL(last_id, 110);
last_id = co_await allocate_n(last_id, 5);
ASSERT_EQ_CORO(last_id, 110);
}

FIXTURE_TEST(stm_reset_batch_test, id_allocator_stm_fixture) {
create_stm_and_start_raft();
wait_for_confirmed_leader();
TEST_F_CORO(id_allocator_stm_fixture, stm_reset_batch_test) {
co_await initialize_state_machines();
int64_t last_id = -1;
allocate_n(last_id, 5);
co_await allocate_n(last_id, 5);

last_id = 100;
_stm->reset_next_id(last_id + 1, 1s).get();
co_await reset(last_id + 1);

// After a leadership change, the reset should still take effect. However,
// it should be offset by one batch.
_raft->step_down("test").get();
wait_for_confirmed_leader();
last_id = allocate_n(last_id, 1);
BOOST_REQUIRE_EQUAL(last_id, 102);
co_await with_leader(1s, [](raft_node_instance& leader_node) {
return leader_node.raft()->step_down("test");
});

last_id = co_await allocate_n(last_id, 1);
vassert(last_id == 102, "unexpected value");
}

} // namespace cluster

0 comments on commit 950e8dc

Please sign in to comment.