Skip to content

Fix cdc heartbeats reporting unconfirmed volatile transactions as resolved KIKIMR-20962 #1473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class TAlterCdcStreamUnit : public TExecutionUnit {
return EExecutionStatus::DelayCompleteNoMoreRestarts;
}

void Complete(TOperation::TPtr, const TActorContext& ctx) override {
DataShard.EmitHeartbeats(ctx);
void Complete(TOperation::TPtr, const TActorContext&) override {
DataShard.EmitHeartbeats();
}
};

Expand Down
28 changes: 18 additions & 10 deletions ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include "cdc_stream_heartbeat.h"
#include "datashard_impl.h"

#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)

namespace NKikimr::NDataShard {

Expand Down Expand Up @@ -32,7 +32,7 @@ class TDataShard::TTxCdcStreamEmitHeartbeats: public NTabletFlatExecutor::TTrans

TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_EMIT_HEARTBEATS; }

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
bool Execute(TTransactionContext& txc, const TActorContext&) override {
LOG_I("Emit change records"
<< ": edge# " << Edge
<< ", at tablet# " << Self->TabletID());
Expand Down Expand Up @@ -69,16 +69,16 @@ class TDataShard::TTxCdcStreamEmitHeartbeats: public NTabletFlatExecutor::TTrans
return true;
}

void Complete(const TActorContext& ctx) override {
void Complete(const TActorContext&) override {
LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
<< ": at tablet# " << Self->TabletID());
Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->EmitHeartbeats(ctx);
Self->EmitHeartbeats();
}

}; // TTxCdcStreamEmitHeartbeats

void TDataShard::EmitHeartbeats(const TActorContext& ctx) {
void TDataShard::EmitHeartbeats() {
LOG_D("Emit heartbeats"
<< ": at tablet# " << TabletID());

Expand All @@ -92,15 +92,23 @@ void TDataShard::EmitHeartbeats(const TActorContext& ctx) {
}

if (const auto& plan = TransQueue.GetPlan()) {
const auto version = plan.begin()->ToRowVersion();
const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, version), ctx);
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
return;
}

if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
return;
}

const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite), ctx);
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
}

WaitPlanStep(lowest.Next().Step);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/complete_data_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void TCompleteOperationUnit::Complete(TOperation::TPtr op,
DataShard.NotifySchemeshard(ctx, op->GetTxId());

DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats(ctx);
DataShard.EmitHeartbeats();

if (op->HasOutputData()) {
const auto& outReadSets = op->OutReadSets();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/create_cdc_stream_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
if (AddSender) {
ctx.Send(DataShard.GetChangeSender(), AddSender.Release());
DataShard.EmitHeartbeats(ctx);
DataShard.EmitHeartbeats();
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
PromoteFollowerReadEdge();
}

EmitHeartbeats(ctx);
EmitHeartbeats();
}

void TDataShard::CheckMediatorStateRestored() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) {
Self->CreateChangeSender(ctx);
Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats(ctx);
Self->EmitHeartbeats();

if (!Self->ChangesQueue) {
if (!Self->ChangeExchangeSplitter.Done()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ class TDataShard

TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() { return CdcStreamHeartbeatManager; }
const TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() const { return CdcStreamHeartbeatManager; }
void EmitHeartbeats(const TActorContext& ctx);
void EmitHeartbeats();

template <typename... Args>
bool PromoteCompleteEdge(Args&&... args) {
Expand Down
109 changes: 105 additions & 4 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include "datashard_ut_common_kqp.h"

#include <ydb/core/base/path.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
Expand All @@ -22,6 +23,7 @@
namespace NKikimr {

using namespace NDataShard;
using namespace NDataShard::NKqpHelpers;
using namespace Tests;

Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
Expand Down Expand Up @@ -1864,11 +1866,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
while (true) {
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
if (records.size() == expected.size()) {
for (ui32 i = 0; i < expected.size(); ++i) {
AssertJsonsEqual(records.at(i).second, expected.at(i));
}
for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
AssertJsonsEqual(records.at(i).second, expected.at(i));
}

if (records.size() >= expected.size()) {
UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
"Unexpected record: " << records.at(expected.size()).second);
break;
}

Expand Down Expand Up @@ -3157,6 +3161,103 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(ResolvedTimestampsVolatileOutOfOrder) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableDataShardVolatileTransactions(true)
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table1", SimpleTable());
CreateShardedTable(server, edgeActor, "/Root", "Table2", SimpleTable());

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table1",
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table2",
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));

WaitForContent(server, edgeActor, "/Root/Table1/Stream", {
R"({"resolved":"***"})",
});
WaitForContent(server, edgeActor, "/Root/Table2/Stream", {
R"({"resolved":"***"})",
});

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table1` (key, value) VALUES (1, 10);
UPSERT INTO `/Root/Table2` (key, value) VALUES (2, 20);
)");

WaitForContent(server, edgeActor, "/Root/Table1/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":10},"key":[1]})",
R"({"resolved":"***"})",
});
WaitForContent(server, edgeActor, "/Root/Table2/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":20},"key":[2]})",
R"({"resolved":"***"})",
});

// Block readset exchange
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start a distributed write to both tables
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto upsertResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
UPSERT INTO `/Root/Table1` (key, value) VALUES (3, 30);
UPSERT INTO `/Root/Table2` (key, value) VALUES (4, 40);
)", sessionId, /* txId */ "", /* commitTx */ true),
"/Root");
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");

// Stop blocking further readsets
blockReadSets.Remove();

// Start another distributed write to both tables, it should succeed
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table1` (key, value) VALUES (5, 50);
UPSERT INTO `/Root/Table2` (key, value) VALUES (6, 60);
)");

runtime.SimulateSleep(TDuration::Seconds(10));

// Unblock readsets
for (auto& ev : readSets) {
runtime.Send(ev.release(), 0, true);
}
readSets.clear();

// There should be only one resolved timestamp after out of order writes
WaitForContent(server, edgeActor, "/Root/Table1/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":10},"key":[1]})",
R"({"resolved":"***"})",
R"({"update":{"value":50},"key":[5]})",
R"({"update":{"value":30},"key":[3]})",
R"({"resolved":"***"})",
});
WaitForContent(server, edgeActor, "/Root/Table2/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":20},"key":[2]})",
R"({"resolved":"***"})",
R"({"update":{"value":60},"key":[6]})",
R"({"update":{"value":40},"key":[4]})",
R"({"resolved":"***"})",
});
}

Y_UNIT_TEST(SequentialSplitMerge) {
TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/direct_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TDirectOpUnit : public TExecutionUnit {
void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
Pipeline.RemoveCommittingOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats(ctx);
DataShard.EmitHeartbeats();

TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
Y_ABORT_UNLESS(tx != nullptr);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/finish_propose_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void TFinishProposeUnit::Complete(TOperation::TPtr op,
Pipeline.RemoveActiveOp(op);

DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats(ctx);
DataShard.EmitHeartbeats();
}

DataShard.SendRegistrationRequestTimeCast(ctx);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/finish_propose_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext
Pipeline.RemoveActiveOp(op);

DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats(ctx);
DataShard.EmitHeartbeats();
}

DataShard.SendRegistrationRequestTimeCast(ctx);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,8 @@ namespace NKikimr::NDataShard {
Self->PromoteFollowerReadEdge();
}

Self->EmitHeartbeats();

if (!WaitingSnapshotEvents.empty()) {
TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr;
while (!WaitingSnapshotEvents.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class TWriteUnit : public TExecutionUnit {
void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
Pipeline.RemoveCommittingOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats(ctx);
DataShard.EmitHeartbeats();

TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

Expand Down