Skip to content

Commit

Permalink
Merge pull request redpanda-data#17112 from mmaslankaprv/fix-stm-manager
Browse files Browse the repository at this point in the history
Fixed not applying snapshot for new stms
  • Loading branch information
mmaslankaprv authored Mar 27, 2024
2 parents 1d4f70b + b700b95 commit fe83597
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 23 deletions.
48 changes: 29 additions & 19 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot(
_log.debug,
"applying empty snapshot at offset: {} for backward "
"compatibility",
metadata.last_included_index);
last_offset);
co_await ss::coroutine::parallel_for_each(
_machines, [metadata, last_offset](auto& pair) {
_machines, [last_offset](auto& pair) {
auto stm = pair.second->stm;
if (stm->last_applied_offset() >= metadata.last_included_index) {
if (stm->last_applied_offset() >= last_offset) {
return ss::now();
}
return stm->apply_raft_snapshot(iobuf{}).then([stm, last_offset] {
Expand All @@ -269,27 +269,37 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot(
auto snap = co_await serde::read_async<managed_snapshot>(parser);

co_await ss::coroutine::parallel_for_each(
snap.snapshot_map,
[this, metadata, last_offset](auto& snapshot_pair) {
auto it = _machines.find(snapshot_pair.first);
if (
it == _machines.end()
|| it->second->stm->last_applied_offset()
>= metadata.last_included_index) {
return ss::now();
}

return it->second->stm
->apply_raft_snapshot(std::move(snapshot_pair.second))
.then([stm = it->second->stm, last_offset] {
stm->set_next(
std::max(model::next_offset(last_offset), stm->next()));
});
_machines,
[this, snap = std::move(snap), last_offset](
state_machines_t::value_type& stm_pair) mutable {
return apply_snapshot_to_stm(stm_pair.second, snap, last_offset);
});
}
_next = model::next_offset(metadata.last_included_index);
}

ss::future<> state_machine_manager::apply_snapshot_to_stm(
ss::lw_shared_ptr<state_machine_entry> stm_entry,
const managed_snapshot& snapshot,
model::offset last_offset) {
auto it = snapshot.snapshot_map.find(stm_entry->name);

if (stm_entry->stm->last_applied_offset() < last_offset) {
if (it != snapshot.snapshot_map.end()) {
co_await stm_entry->stm->apply_raft_snapshot(it->second);
} else {
/**
* In order to hold the stm contract we need to call the
* apply_raft_snapshot with empty data
*/
co_await stm_entry->stm->apply_raft_snapshot(iobuf{});
}
}

stm_entry->stm->set_next(
std::max(model::next_offset(last_offset), stm_entry->stm->next()));
}

ss::future<> state_machine_manager::apply() {
try {
ss::coroutine::switch_to sg_sw(_apply_sg);
Expand Down
5 changes: 5 additions & 0 deletions src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class state_machine_manager final {
auto serde_fields() { return std::tie(snapshot_map); }
};

ss::future<> apply_snapshot_to_stm(
ss::lw_shared_ptr<state_machine_entry> stm_entry,
const managed_snapshot& snapshot,
model::offset last_included_offset);

consensus* _raft;
ctx_log _log;
mutex _apply_mutex{"stm_manager::apply"};
Expand Down
110 changes: 106 additions & 4 deletions src/v/raft/tests/persisted_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ class persisted_kv : public persisted_stm<> {
0, last_applied_offset(), serde::to_iobuf(state));
};

static void
static std::optional<kv_operation>
apply_to_state(const model::record_batch& batch, kv_state& state) {
if (batch.header().type != model::record_batch_type::raft_data) {
return;
return std::nullopt;
}
batch.for_each_record([&state](model::record r) {
kv_operation last_op;
batch.for_each_record([&state, &last_op](model::record r) {
auto op = serde::from_iobuf<kv_operation>(r.value().copy());
/**
* Here we check if validation pre replication is correct.
Expand Down Expand Up @@ -206,11 +207,16 @@ class persisted_kv : public persisted_stm<> {
state.remove(op.key);
}
}
last_op = op;
});
return last_op;
}

ss::future<> apply(const model::record_batch& batch) override {
apply_to_state(batch, state);
auto last_op = apply_to_state(batch, state);
if (last_op) {
last_operation = std::move(*last_op);
}
co_return;
}

Expand Down Expand Up @@ -265,9 +271,38 @@ class persisted_kv : public persisted_stm<> {
}

kv_state state;
kv_operation last_operation;
raft_node_instance& raft_node;
};

class other_persisted_kv : public persisted_kv {
public:
static constexpr std::string_view name = "other_persited_kv_stm";
explicit other_persisted_kv(raft_node_instance& rn)
: persisted_kv(rn) {}
ss::future<> apply_raft_snapshot(const iobuf& buffer) override {
if (buffer.empty()) {
co_return;
}
state = serde::from_iobuf<kv_state>(buffer.copy());
co_return;
};
/**
* This STM doesn't execute the full apply logic from the base persisted_kv
* as it is going to be started without the full data in the snapshot, hence
* the validation would fail.
*/
ss::future<> apply(const model::record_batch& batch) override {
if (batch.header().type != model::record_batch_type::raft_data) {
co_return;
}
batch.for_each_record([this](model::record r) {
last_operation = serde::from_iobuf<kv_operation>(r.value().copy());
});
co_return;
}
};

struct persisted_stm_test_fixture : state_machine_fixture {
ss::future<> initialize_state_machines() {
create_nodes();
Expand Down Expand Up @@ -549,3 +584,70 @@ TEST_F_CORO(persisted_stm_test_fixture, test_raft_and_local_snapshot) {
ASSERT_EQ_CORO(stm->state, expected);
}
}
/**
* Tests the scenario in which an STM is added to the partition after it was
* already alive and Raft snapshot was taken on the partition.
*
* The snapshot doesn't contain data for the newly created stm, however the stm
* next offset should still be updated to make it possible for the STM to catch
* up.
*/
TEST_F_CORO(persisted_stm_test_fixture, test_adding_state_machine) {
co_await initialize_state_machines();
kv_state expected;
auto ops = random_operations(2000);
for (auto batch : ops) {
co_await apply_operations(expected, std::move(batch));
}
co_await wait_for_apply();
for (const auto& [_, stm] : node_stms) {
ASSERT_EQ_CORO(stm->state, expected);
}

// take local snapshot on every node
co_await take_local_snapshot_on_every_node();
// update state
auto ops_phase_two = random_operations(50);
for (auto batch : ops_phase_two) {
co_await apply_operations(expected, std::move(batch));
}

co_await wait_for_apply();
for (const auto& [_, stm] : node_stms) {
ASSERT_EQ_CORO(stm->state, expected);
}

// take Raft snapshot on every node, there are two possibilities here either
// a snapshot will be taken at offset preceding current local snapshot or
// the one following local snapshot.
co_await take_raft_snapshot_all_nodes();

auto committed = node(model::node_id(0)).raft()->committed_offset();

absl::flat_hash_map<model::node_id, ss::sstring> data_directories;
for (auto& [id, node] : nodes()) {
data_directories[id] = node->raft()->log()->config().base_directory();
}

for (auto& [id, data_dir] : data_directories) {
co_await stop_node(id);
add_node(id, model::revision_id(0), std::move(data_dir));
}
ss::shared_ptr<other_persisted_kv> other_stm;
for (auto& [_, node] : nodes()) {
co_await node->initialise(all_vnodes());
raft::state_machine_manager_builder builder;
auto stm = builder.create_stm<persisted_kv>(*node);
other_stm = builder.create_stm<other_persisted_kv>(*node);
co_await node->start(std::move(builder));
node_stms.emplace(node->get_vnode(), std::move(stm));
}

co_await wait_for_committed_offset(committed, 30s);
co_await wait_for_apply();

for (const auto& [_, stm] : node_stms) {
ASSERT_EQ_CORO(stm->state, expected);
ASSERT_EQ_CORO(stm->last_operation, other_stm->last_operation);
}
}
89 changes: 89 additions & 0 deletions tests/rptest/tests/tiered_storage_enable_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
from rptest.clients.types import TopicSpec
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.redpanda import SISettings

from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.utils.mode_checks import skip_debug_mode


class TestEnablingTieredStorage(PreallocNodesTest):
def __init__(self, test_context):
super().__init__(test_context,
num_brokers=3,
node_prealloc_count=1,
si_settings=SISettings(test_context=test_context,
fast_uploads=True))

@property
def producer_throughput(self):
return 5 * (1024 * 1024) if not self.debug_mode else 1000

@property
def msg_count(self):
return 20 * int(self.producer_throughput / self.msg_size)

@property
def msg_size(self):
return 128

def start_producer(self):
self.logger.info(
f"starting kgo-verifier producer with {self.msg_count} messages of size {self.msg_size} and throughput: {self.producer_throughput} bps"
)
self.producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
self._topic,
self.msg_size,
self.msg_count,
custom_node=self.preallocated_nodes,
rate_limit_bps=self.producer_throughput)

self.producer.start(clean=False)
self.producer.wait_for_acks(
5 * (self.producer_throughput / self.msg_size), 120, 1)

@cluster(num_nodes=4)
@skip_debug_mode
def test_enabling_tiered_storage_on_old_topic(self):
# disable cloud storage and restart cluster
self.redpanda.set_cluster_config({"cloud_storage_enabled": False},
expect_restart=True)
# create topic without tiered storage enabled
topic = TopicSpec(partition_count=3,
segment_bytes=1024 * 1024,
retention_bytes=5 * 1024 * 1024)

self.client().create_topic(topic)
self._topic = topic.name
self.start_producer()
rpk = RpkTool(self.redpanda)

def _start_offset_updated():
partitions = rpk.describe_topic(self._topic)
return all([p.start_offset > 0 for p in partitions])

wait_until(
_start_offset_updated,
timeout_sec=60,
backoff_sec=1,
err_msg=
"timed out waiting for local retention to clean up some some data")

# enable cloud storage
self.redpanda.set_cluster_config({"cloud_storage_enabled": True},
expect_restart=True)

self.redpanda.wait_for_manifest_uploads()

0 comments on commit fe83597

Please sign in to comment.