Skip to content

Commit

Permalink
Merge pull request redpanda-data#16774 from Lazin/feature/propagate-a…
Browse files Browse the repository at this point in the history
…rchival-stm-apply-error

archival: Improve archival STM concurrency control
  • Loading branch information
dotnwat authored Mar 23, 2024
2 parents 9bacf66 + a07f8fb commit 0d37b5a
Show file tree
Hide file tree
Showing 12 changed files with 600 additions and 243 deletions.
493 changes: 277 additions & 216 deletions src/v/archival/archival_metadata_stm.cc

Large diffs are not rendered by default.

51 changes: 40 additions & 11 deletions src/v/archival/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@
#include "cloud_storage/fwd.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/types.h"
#include "cluster/errc.h"
#include "cluster/state_machine_registry.h"
#include "features/fwd.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "model/timeout_clock.h"
#include "raft/persisted_stm.h"
#include "storage/record_batch_builder.h"
#include "utils/mutex.h"
#include "utils/prefix_logger.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/weak_ptr.hh>
#include <seastar/util/log.hh>
#include <seastar/util/noncopyable_function.hh>

#include <exception>
#include <functional>
#include <system_error>

Expand All @@ -36,6 +42,7 @@ namespace cluster {
namespace details {
/// This class is supposed to be implemented in unit tests.
class archival_metadata_stm_accessor;
class command_batch_builder_accessor;
} // namespace details

class archival_metadata_stm;
Expand All @@ -45,6 +52,8 @@ using segment_validated = ss::bool_class<struct segment_validated_tag>;
/// Batch builder allows to combine different archival_metadata_stm commands
/// together in a single record batch
class command_batch_builder {
friend class command_batch_builder_accessor;

public:
command_batch_builder(
archival_metadata_stm& stm,
Expand Down Expand Up @@ -85,6 +94,18 @@ class command_batch_builder {
cloud_storage::scrub_status status,
cloud_storage::anomalies detected);
command_batch_builder& reset_scrubbing_metadata();
/// Add read-write fence
///
/// The fence prevents all subsequent commands in the batch from being
/// applied to the in-memory state of the STM if the 'applied_offset' is
/// greater than the 'offset'. This mechanism is supposed to be used as a
/// concurrency-control mechanism.
command_batch_builder& read_write_fence(model::offset offset);

/// Add update_highest_producer_id_cmd command
command_batch_builder&
update_highest_producer_id(model::producer_id highest_pid);

/// Replicate the configuration batch
ss::future<std::error_code> replicate();

Expand Down Expand Up @@ -212,10 +233,12 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
model::term_id term,
const cloud_storage::partition_manifest& manifest);

// Attempts to bring the archival STM in sync with the log.
// Returns "true" if it has synced succesfully *and* the replica
// is still the leader with the correct term.
ss::future<bool> sync(model::timeout_clock::duration timeout);
/// This method guarantees that the STM applied all changes
/// in the log to the in-memory state.
ss::future<std::optional<model::offset>>
sync(model::timeout_clock::duration timeout);
ss::future<std::optional<model::offset>>
sync(model::timeout_clock::duration timeout, ss::abort_source* as);

model::offset get_start_offset() const;
model::offset get_last_offset() const;
Expand All @@ -228,7 +251,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
fragmented_vector<cloud_storage::partition_manifest::lw_segment_meta>
get_segments_to_cleanup() const;

/// Create batch builder that can be used to combine and replicate multipe
/// Create batch builder that can be used to combine and replicate multiple
/// STM commands together
command_batch_builder
batch_start(ss::lowres_clock::time_point deadline, ss::abort_source&);
Expand All @@ -251,6 +274,9 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

private:
ss::future<bool>
do_sync(model::timeout_clock::duration timeout, ss::abort_source* as);

ss::future<std::error_code> do_add_segments(
std::vector<cloud_storage::segment_meta>,
std::optional<model::offset> clean_offset,
Expand Down Expand Up @@ -288,6 +314,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
struct process_anomalies_cmd;
struct reset_scrubbing_metadata;
struct update_highest_producer_id_cmd;
struct read_write_fence_cmd;
struct snapshot;

friend segment segment_from_meta(const cloud_storage::segment_meta& meta);
Expand Down Expand Up @@ -316,6 +343,13 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
void apply_process_anomalies(iobuf);
void apply_reset_scrubbing_metadata();
void apply_update_highest_producer_id(model::producer_id pid);
// apply fence command and return true if the 'apply' call should
// be interrupted
bool apply_read_write_fence(const read_write_fence_cmd&) noexcept;

// Notify current waiter in the 'do_replicate'
void maybe_notify_waiter(cluster::errc) noexcept;
void maybe_notify_waiter(std::exception_ptr) noexcept;

private:
prefix_logger _logger;
Expand All @@ -332,12 +366,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
// The offset of the last record that modified this stm
model::offset _last_dirty_at;

// The last replication future
struct last_replicate {
model::term_id term;
ss::shared_future<result<raft::replicate_result>> result;
};
std::optional<last_replicate> _last_replicate;
std::optional<ss::promise<errc>> _active_operation_res;

cloud_storage::remote& _cloud_storage_api;
features::feature_table& _feature_table;
Expand Down
8 changes: 4 additions & 4 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,9 @@ ss::future<> ntp_archiver::upload_until_abort() {
vlog(_rtclog.debug, "upload loop starting in term {}", _start_term);
auto sync_timeout = config::shard_local_cfg()
.cloud_storage_metadata_sync_timeout_ms.value();
bool is_synced = co_await _parent.archival_meta_stm()->sync(
auto is_synced = co_await _parent.archival_meta_stm()->sync(
sync_timeout);
if (!is_synced) {
if (!is_synced.has_value()) {
continue;
}
vlog(_rtclog.debug, "upload loop synced in term {}", _start_term);
Expand Down Expand Up @@ -667,9 +667,9 @@ ss::future<> ntp_archiver::upload_until_term_change() {

auto sync_timeout = config::shard_local_cfg()
.cloud_storage_metadata_sync_timeout_ms.value();
bool is_synced = co_await _parent.archival_meta_stm()->sync(
auto is_synced = co_await _parent.archival_meta_stm()->sync(
sync_timeout);
if (!is_synced) {
if (!is_synced.has_value()) {
// This can happen on leadership changes, or on timeouts waiting
// for stm to catch up: in either case, we should re-check our
// loop condition: we will drop out if lost leadership, otherwise
Expand Down
190 changes: 187 additions & 3 deletions src/v/archival/tests/archival_metadata_stm_gtest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#include "cloud_storage_clients/client_pool.h"
#include "model/record.h"
#include "raft/tests/raft_fixture.h"
#include "test_utils/scoped_config.h"
#include "test_utils/test.h"
#include "utils/available_promise.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
Expand Down Expand Up @@ -123,6 +126,17 @@ class archival_metadata_stm_gtest_fixture : public raft::raft_fixture {
return *ptr;
}

ss::future<> wait_for_apply() {
auto committed_offset = co_await with_leader(
10s, [](auto& node) { return node.raft()->committed_offset(); });

co_await parallel_for_each_node([committed_offset](auto& node) {
return node.raft()->stm_manager()->wait(
committed_offset, model::no_timeout);
});
co_return;
}

private:
std::array<archival_stm_node, node_count> _archival_stm_nodes;
};
Expand Down Expand Up @@ -171,6 +185,8 @@ TEST_F_CORO(archival_metadata_stm_gtest_fixture, test_archival_stm_happy_path) {
ASSERT_EQ_CORO(
get_leader_stm().get_dirty(),
cluster::archival_metadata_stm::state_dirty::clean);

co_await wait_for_apply();
}

TEST_F_CORO(
Expand Down Expand Up @@ -261,7 +277,6 @@ TEST_F_CORO(
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}

return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(sync_result_before_replication);
Expand All @@ -272,7 +287,6 @@ TEST_F_CORO(
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}

return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(second_sync_result_before_replication);
Expand All @@ -286,7 +300,6 @@ TEST_F_CORO(
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}

return get_leader_stm().sync(10s);
});

Expand All @@ -303,4 +316,175 @@ TEST_F_CORO(

ASSERT_EQ_CORO(committed_offset, model::offset{2});
ASSERT_EQ_CORO(term, model::term_id{1});

co_await wait_for_apply();
}

TEST_F_CORO(
archival_metadata_stm_gtest_fixture, test_archival_stm_error_propagation) {
ss::abort_source never_abort;

auto s_cfg = scoped_config{};
s_cfg.get("cloud_storage_disable_metadata_consistency_checks")
.set_value(false);
co_await start();

std::vector<cloud_storage::segment_meta> good_segment;
good_segment.push_back(segment_meta{
.base_offset = model::offset(0),
.committed_offset = model::offset(99),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

co_await wait_for_leader(10s);
auto timeout = 30s;
auto deadline = ss::lowres_clock::now() + timeout;

ASSERT_EQ_CORO(
get_leader_stm().get_dirty(),
cluster::archival_metadata_stm::state_dirty::dirty);

auto is_synced = co_await get_leader_stm().sync(timeout);

ASSERT_TRUE_CORO(is_synced);

auto repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.add_segments(
good_segment, cluster::segment_validated::yes)
.replicate();

ASSERT_EQ_CORO(repl_err, cluster::errc::success);

ASSERT_EQ_CORO(get_leader_stm().manifest().size(), 1);
ASSERT_EQ_CORO(
get_leader_stm().manifest().begin()->base_offset, model::offset(0));
ASSERT_EQ_CORO(
get_leader_stm().manifest().begin()->committed_offset, model::offset(99));

ASSERT_EQ_CORO(
get_leader_stm().get_dirty(),
cluster::archival_metadata_stm::state_dirty::dirty);

repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.mark_clean(get_leader_stm().manifest().get_insync_offset())
.replicate();
ASSERT_EQ_CORO(repl_err, cluster::errc::success);

ASSERT_EQ_CORO(
get_leader_stm().get_dirty(),
cluster::archival_metadata_stm::state_dirty::clean);

// Attempt to replicate incorrect record batch
std::vector<cloud_storage::segment_meta> poisoned_segment;
poisoned_segment.push_back(segment_meta{
.base_offset = model::offset(101),
.committed_offset = model::offset(999),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.add_segments(
std::move(poisoned_segment), cluster::segment_validated::yes)
.replicate();
ASSERT_EQ_CORO(repl_err, cluster::errc::inconsistent_stm_update);

// Check that it still works with consistent updates
good_segment.clear();
good_segment.push_back(segment_meta{
.base_offset = model::offset(100),
.committed_offset = model::offset(999),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.add_segments(good_segment, cluster::segment_validated::yes)
.replicate();
ASSERT_EQ_CORO(repl_err, cluster::errc::success);

co_await wait_for_apply();
}

TEST_F_CORO(
archival_metadata_stm_gtest_fixture, test_archival_stm_read_write_fence) {
ss::abort_source never_abort;
auto timeout = 30s;
auto deadline = ss::lowres_clock::now() + timeout;

auto s_cfg = scoped_config{};
s_cfg.get("cloud_storage_disable_metadata_consistency_checks")
.set_value(false);
co_await start();

std::vector<cloud_storage::segment_meta> good_segment;
good_segment.push_back(segment_meta{
.base_offset = model::offset(0),
.committed_offset = model::offset(99),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

co_await wait_for_leader(10s);

ASSERT_EQ_CORO(
get_leader_stm().get_dirty(),
cluster::archival_metadata_stm::state_dirty::dirty);

auto is_synced = co_await get_leader_stm().sync(timeout);

ASSERT_TRUE_CORO(is_synced);

auto applied_offset = get_leader_stm().manifest().get_applied_offset();

auto repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.read_write_fence(applied_offset)
.add_segments(
good_segment, cluster::segment_validated::yes)
.replicate();
ASSERT_EQ_CORO(repl_err, cluster::errc::success);

ASSERT_EQ_CORO(get_leader_stm().manifest().size(), 1);
ASSERT_EQ_CORO(
get_leader_stm().manifest().begin()->base_offset, model::offset(0));
ASSERT_EQ_CORO(
get_leader_stm().manifest().begin()->committed_offset, model::offset(99));

// It's guaranteed that the batch is already applied to the STM
applied_offset = get_leader_stm().manifest().get_applied_offset();
ASSERT_TRUE_CORO(applied_offset > model::offset(0));

good_segment.clear();
good_segment.push_back(segment_meta{
.base_offset = model::offset(100),
.committed_offset = model::offset(199),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.read_write_fence(applied_offset)
.add_segments(good_segment, cluster::segment_validated::yes)
.replicate();
ASSERT_EQ_CORO(repl_err, cluster::errc::success);

// Emulate concurrency violation
applied_offset = model::offset{0};
good_segment.clear();
good_segment.push_back(segment_meta{
.base_offset = model::offset(200),
.committed_offset = model::offset(299),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

repl_err = co_await get_leader_stm()
.batch_start(deadline, never_abort)
.read_write_fence(applied_offset)
.add_segments(good_segment, cluster::segment_validated::yes)
.replicate();
ASSERT_EQ_CORO(repl_err, cluster::errc::concurrent_modification_error);

co_await wait_for_apply();
}
Loading

0 comments on commit 0d37b5a

Please sign in to comment.