Skip to content

Commit

Permalink
Merge pull request #18760 from mmaslankaprv/group-recovery-test
Browse files Browse the repository at this point in the history
Added basic test validating consume group recovery
  • Loading branch information
mmaslankaprv authored Jun 5, 2024
2 parents 6bc4a00 + a5cc14d commit 4ddb309
Show file tree
Hide file tree
Showing 10 changed files with 656 additions and 144 deletions.
88 changes: 41 additions & 47 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "kafka/protocol/txn_offset_commit.h"
#include "kafka/protocol/wire.h"
#include "kafka/server/group_metadata.h"
#include "kafka/server/group_stm.h"
#include "kafka/server/logger.h"
#include "kafka/server/member.h"
#include "kafka/types.h"
Expand Down Expand Up @@ -170,9 +169,9 @@ bool group::valid_previous_state(group_state s) const {

__builtin_unreachable();
}

namespace {
template<typename T>
static model::record_batch make_tx_batch(
model::record_batch make_tx_batch(
model::record_batch_type type,
int8_t version,
const model::producer_identity& pid,
Expand All @@ -192,14 +191,15 @@ static model::record_batch make_tx_batch(
return std::move(builder).build();
}

static model::record_batch make_tx_fence_batch(
const model::producer_identity& pid, group_log_fencing cmd) {
model::record_batch make_tx_fence_batch(
const model::producer_identity& pid, group_tx::fence_metadata cmd) {
return make_tx_batch(
model::record_batch_type::tx_fence,
group::fence_control_record_version,
pid,
std::move(cmd));
}
} // namespace

group_state group::set_state(group_state s) {
vassert(
Expand Down Expand Up @@ -1662,14 +1662,13 @@ void group::fail_offset_commit(

void group::reset_tx_state(model::term_id term) {
_term = term;
_volatile_txs.clear();
_prepared_txs.clear();
_ongoing_tx_offsets.clear();
_expiration_info.clear();
_tx_data.clear();
_fence_pid_epoch.clear();
}

void group::insert_prepared(prepared_tx tx) {
void group::insert_ongoing_tx_offsets(ongoing_tx_offsets tx) {
auto pid = tx.pid;

// TODO: warn when legacy support is removed and _tx_data doesn't contain
Expand All @@ -1680,14 +1679,14 @@ void group::insert_prepared(prepared_tx tx) {
if (txseq_it->second.tx_seq != tx.tx_seq) {
vlog(
_ctx_txlog.warn,
"prepared_tx of pid {} has tx_seq {} while {} expected",
"ongoing tx of pid {} has tx_seq {} while {} expected",
tx.pid,
tx.tx_seq,
txseq_it->second.tx_seq);
}
}

_prepared_txs[pid] = std::move(tx);
_ongoing_tx_offsets[pid] = std::move(tx);
}

ss::future<cluster::commit_group_tx_reply>
Expand Down Expand Up @@ -1744,15 +1743,15 @@ group::commit_tx(cluster::commit_group_tx_request r) {
co_return make_commit_tx_reply(cluster::tx_errc::request_rejected);
}

auto prepare_it = _prepared_txs.find(r.pid);
if (prepare_it == _prepared_txs.end()) {
auto ongoing_it = _ongoing_tx_offsets.find(r.pid);
if (ongoing_it == _ongoing_tx_offsets.end()) {
vlog(
_ctx_txlog.trace,
"can't find a tx {}, probably already comitted",
r.pid);
co_return make_commit_tx_reply(cluster::tx_errc::none);
}
if (prepare_it->second.tx_seq > r.tx_seq) {
if (ongoing_it->second.tx_seq > r.tx_seq) {
// rare situation:
// * tm_stm prepares (tx_seq+1)
// * prepare on this group passed but tm_stm failed to write to disk
Expand All @@ -1763,10 +1762,10 @@ group::commit_tx(cluster::commit_group_tx_request r) {
"prepare for pid:{} has higher tx_seq:{} than given: {} => replaying "
"already comitted commit",
r.pid,
prepare_it->second.tx_seq,
ongoing_it->second.tx_seq,
r.tx_seq);
co_return make_commit_tx_reply(cluster::tx_errc::none);
} else if (prepare_it->second.tx_seq < r.tx_seq) {
} else if (ongoing_it->second.tx_seq < r.tx_seq) {
co_return make_commit_tx_reply(cluster::tx_errc::request_rejected);
}

Expand Down Expand Up @@ -1863,7 +1862,7 @@ group::begin_tx(cluster::begin_group_tx_request r) {
co_return make_begin_tx_reply(
cluster::tx_errc::unknown_server_error);
}
if (_prepared_txs.contains(r.pid)) {
if (_ongoing_tx_offsets.contains(r.pid)) {
vlog(
_ctx_txlog.warn,
"can't begin a tx {} with tx_seq {}: it was already begun and it "
Expand All @@ -1876,7 +1875,7 @@ group::begin_tx(cluster::begin_group_tx_request r) {
co_return cluster::begin_group_tx_reply(_term, cluster::tx_errc::none);
}

group_log_fencing fence{
group_tx::fence_metadata fence{
.group_id = id(),
.tx_seq = r.tx_seq,
.transaction_timeout_ms = r.timeout,
Expand Down Expand Up @@ -2065,13 +2064,13 @@ group::store_txn_offsets(txn_offset_commit_request r) {
}
auto tx_seq = txseq_it->second.tx_seq;

absl::node_hash_map<model::topic_partition, group_log_prepared_tx_offset>
absl::node_hash_map<model::topic_partition, group_tx::partition_offset>
offsets;

auto prepare_it = _prepared_txs.find(pid);
if (prepare_it != _prepared_txs.end()) {
for (const auto& [tp, offset] : prepare_it->second.offsets) {
group_log_prepared_tx_offset md{
auto ongoing_it = _ongoing_tx_offsets.find(pid);
if (ongoing_it != _ongoing_tx_offsets.end()) {
for (const auto& [tp, offset] : ongoing_it->second.offsets) {
group_tx::partition_offset md{
.tp = tp,
.offset = offset.offset,
.leader_epoch = offset.committed_leader_epoch,
Expand All @@ -2083,7 +2082,7 @@ group::store_txn_offsets(txn_offset_commit_request r) {
for (const auto& t : r.data.topics) {
for (const auto& p : t.partitions) {
model::topic_partition tp(t.name, p.partition_index);
group_log_prepared_tx_offset md{
group_tx::partition_offset md{
.tp = tp,
.offset = p.committed_offset,
.leader_epoch = p.committed_leader_epoch,
Expand All @@ -2092,8 +2091,9 @@ group::store_txn_offsets(txn_offset_commit_request r) {
}
}

auto tx_entry = group_log_prepared_tx{
auto tx_entry = group_tx::offsets_metadata{
.group_id = r.data.group_id, .pid = pid, .tx_seq = tx_seq};
tx_entry.offsets.reserve(offsets.size());

for (const auto& [tp, offset] : offsets) {
tx_entry.offsets.push_back(offset);
Expand Down Expand Up @@ -2122,7 +2122,7 @@ group::store_txn_offsets(txn_offset_commit_request r) {
r, error_code::unknown_server_error);
}

prepared_tx ptx;
ongoing_tx_offsets ptx;
ptx.tx_seq = tx_seq;
ptx.pid = pid;
const auto now = model::timestamp::now();
Expand All @@ -2137,7 +2137,7 @@ group::store_txn_offsets(txn_offset_commit_request r) {
};
ptx.offsets[tp] = md;
}
_prepared_txs[pid] = ptx;
_ongoing_tx_offsets[pid] = ptx;

auto it = _expiration_info.find(pid);
if (it != _expiration_info.end()) {
Expand Down Expand Up @@ -2890,11 +2890,7 @@ ss::future<cluster::abort_group_tx_reply> group::do_abort(
kafka::group_id group_id,
model::producer_identity pid,
model::tx_seq tx_seq) {
// preventing prepare and replicate once we
// know we're going to abort tx and abandon pid
_volatile_txs.erase(pid);

auto tx = group_log_aborted_tx{.group_id = group_id, .tx_seq = tx_seq};
auto tx = group_tx::abort_metadata{.group_id = group_id, .tx_seq = tx_seq};

auto batch = make_tx_batch(
model::record_batch_type::group_abort_tx,
Expand Down Expand Up @@ -2922,7 +2918,7 @@ ss::future<cluster::abort_group_tx_reply> group::do_abort(
co_return make_abort_tx_reply(cluster::tx_errc::timeout);
}

_prepared_txs.erase(pid);
_ongoing_tx_offsets.erase(pid);
_tx_data.erase(pid.get_id());
_expiration_info.erase(pid);

Expand All @@ -2931,8 +2927,8 @@ ss::future<cluster::abort_group_tx_reply> group::do_abort(

ss::future<cluster::commit_group_tx_reply>
group::do_commit(kafka::group_id group_id, model::producer_identity pid) {
auto prepare_it = _prepared_txs.find(pid);
if (prepare_it == _prepared_txs.end()) {
auto ongoing_it = _ongoing_tx_offsets.find(pid);
if (ongoing_it == _ongoing_tx_offsets.end()) {
// Impossible situation
vlog(_ctx_txlog.error, "Can not find prepared tx for pid: {}", pid);
co_return make_commit_tx_reply(cluster::tx_errc::unknown_server_error);
Expand All @@ -2954,7 +2950,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) {

cluster::simple_batch_builder store_offset_builder(
model::record_batch_type::raft_data, model::offset(0));
for (const auto& [tp, metadata] : prepare_it->second.offsets) {
for (const auto& [tp, metadata] : ongoing_it->second.offsets) {
update_store_offset_builder(
store_offset_builder,
tp.topic,
Expand All @@ -2968,7 +2964,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) {

batches.push_back(std::move(store_offset_builder).build());

group_log_commit_tx commit_tx;
group_tx::commit_metadata commit_tx;
commit_tx.group_id = group_id;
auto batch = make_tx_batch(
model::record_batch_type::group_commit_tx,
Expand Down Expand Up @@ -2999,20 +2995,20 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) {
co_return make_commit_tx_reply(cluster::tx_errc::timeout);
}

prepare_it = _prepared_txs.find(pid);
if (prepare_it == _prepared_txs.end()) {
ongoing_it = _ongoing_tx_offsets.find(pid);
if (ongoing_it == _ongoing_tx_offsets.end()) {
vlog(
_ctx_txlog.error,
"can't find already observed prepared tx pid:{}",
pid);
co_return make_commit_tx_reply(cluster::tx_errc::unknown_server_error);
}

for (const auto& [tp, md] : prepare_it->second.offsets) {
for (const auto& [tp, md] : ongoing_it->second.offsets) {
try_upsert_offset(tp, md);
}

_prepared_txs.erase(prepare_it);
_ongoing_tx_offsets.erase(ongoing_it);
_tx_data.erase(pid.get_id());
_expiration_info.erase(pid);

Expand Down Expand Up @@ -3051,12 +3047,10 @@ ss::future<> group::do_abort_old_txes() {
}

std::vector<model::producer_identity> pids;
for (auto& [id, _] : _prepared_txs) {
pids.push_back(id);
}
for (auto& [id, _] : _volatile_txs) {
for (auto& [id, _] : _ongoing_tx_offsets) {
pids.push_back(id);
}

for (auto& [id, _] : _tx_data) {
auto it = _fence_pid_epoch.find(id);
if (it != _fence_pid_epoch.end()) {
Expand Down Expand Up @@ -3102,8 +3096,8 @@ ss::future<cluster::tx_errc>
group::do_try_abort_old_tx(model::producer_identity pid) {
vlog(_ctx_txlog.trace, "aborting pid:{}", pid);

auto p_it = _prepared_txs.find(pid);
if (p_it != _prepared_txs.end()) {
auto p_it = _ongoing_tx_offsets.find(pid);
if (p_it != _ongoing_tx_offsets.end()) {
auto tx_seq = p_it->second.tx_seq;
auto tx_data = _tx_data.find(pid.get_id());
model::partition_id tm = model::legacy_tm_ntp.tp.partition;
Expand Down Expand Up @@ -3416,7 +3410,7 @@ group::get_expired_offsets(std::chrono::seconds retention_period) {

bool group::has_offsets() const {
return !_offsets.empty() || !_pending_offset_commits.empty()
|| !_volatile_txs.empty() || !_tx_data.empty();
|| !_tx_data.empty();
}

std::vector<model::topic_partition>
Expand Down
31 changes: 6 additions & 25 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
}
};

struct prepared_tx {
struct ongoing_tx_offsets {
model::producer_identity pid;
model::tx_seq tx_seq;
absl::node_hash_map<model::topic_partition, offset_metadata> offsets;
Expand Down Expand Up @@ -591,7 +591,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
}
}

void insert_prepared(prepared_tx);
void insert_ongoing_tx_offsets(ongoing_tx_offsets);

void try_set_fence(model::producer_id id, model::producer_epoch epoch) {
auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch);
Expand Down Expand Up @@ -828,17 +828,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
}

if (std::any_of(
_volatile_txs.begin(),
_volatile_txs.end(),
[&tp](const auto& tp_info) {
return tp_info.second.offsets.contains(tp);
})) {
return true;
}

if (std::any_of(
_prepared_txs.begin(),
_prepared_txs.end(),
_ongoing_tx_offsets.begin(),
_ongoing_tx_offsets.end(),
[&tp](const auto& tp_info) {
return tp_info.second.offsets.contains(tp);
})) {
Expand Down Expand Up @@ -934,19 +925,9 @@ class group final : public ss::enable_lw_shared_from_this<group> {
absl::node_hash_map<model::topic_partition, offset_metadata>
_pending_offset_commits;
enable_group_metrics _enable_group_metrics;
struct volatile_offset {
model::offset offset;
kafka::leader_epoch leader_epoch;
std::optional<ss::sstring> metadata;
};

struct volatile_tx {
model::tx_seq tx_seq;
absl::node_hash_map<model::topic_partition, volatile_offset> offsets;
};

absl::node_hash_map<model::producer_identity, volatile_tx> _volatile_txs;
absl::node_hash_map<model::producer_identity, prepared_tx> _prepared_txs;
absl::node_hash_map<model::producer_identity, ongoing_tx_offsets>
_ongoing_tx_offsets;

struct expiration_info {
expiration_info(model::timeout_clock::duration timeout)
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,8 @@ ss::future<> group_manager::do_recover_group(
});
}

for (const auto& [_, tx] : group_stm.prepared_txs()) {
group->insert_prepared(tx);
for (const auto& [_, tx] : group_stm.ongoing_tx_offsets()) {
group->insert_ongoing_tx_offsets(tx);
}
for (auto& [id, epoch] : group_stm.fences()) {
group->try_set_fence(id, epoch);
Expand Down
8 changes: 8 additions & 0 deletions src/v/kafka/server/group_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@

namespace kafka {

group_metadata_kv group_metadata_kv::copy() const {
group_metadata_kv cp{.key = key};
if (value) {
cp.value = value->copy();
}
return cp;
}

/**
* /Kafka
* Messages stored for the group topic has versions for both the key and value
Expand Down
Loading

0 comments on commit 4ddb309

Please sign in to comment.