Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() {
return;
}

// We may possibly have more writes at this version
TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite);
bool wait = true;

if (const auto& plan = TransQueue.GetPlan()) {
const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
return;
edge = Min(edge, plan.begin()->ToRowVersion());
wait = false;
}

if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
return;
edge = Min(edge, version);
wait = false;
}

const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, edge));
}

WaitPlanStep(lowest.Next().Step);
if (wait) {
WaitPlanStep(lowest.Next().Step);
}
}

void TCdcStreamHeartbeatManager::Reset() {
Expand Down Expand Up @@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
return false;
}

if (Schedule.top().Version > edge) {
if (Schedule.top().Version >= edge) {
return false;
}

Expand All @@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbeatManager::EmitHeartbeats(
NTable::TDatabase& db, const TRowVersion& edge)
{
if (Schedule.empty() || Schedule.top().Version > edge) {
if (!ShouldEmitHeartbeat(edge)) {
return {};
}

Expand All @@ -234,7 +234,7 @@ THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbea

while (true) {
const auto& top = Schedule.top();
if (top.Version > edge) {
if (top.Version >= edge) {
break;
}

Expand Down
140 changes: 138 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/user_info.h>
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/tx/scheme_board/events_internal.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
Expand Down Expand Up @@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
return result;
}

void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
TVector<NJson::TJsonValue> WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
while (true) {
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
Expand All @@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
if (records.size() >= expected.size()) {
UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
"Unexpected record: " << records.at(expected.size()).second);
break;
TVector<NJson::TJsonValue> values;
for (const auto& pr : records) {
bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back());
Y_ABORT_UNLESS(ok);
}
return values;
}

SimulateSleep(server, TDuration::Seconds(1));
Expand Down Expand Up @@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
);

TDisableDataShardLogBatching disableDataShardLogBatching;

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
SetSplitMergePartCountLimit(&runtime, -1);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
WithVirtualTimestamps(WithResolvedTimestamps(
TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))));

Cerr << "... prepare" << Endl;
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
});

KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
)");

auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":10},"key":[1],"ts":"***"})",
R"({"resolved":"***"})",
});

// Take the final step
ui64 lastStep = records.back()["resolved"][0].GetUInteger();
Cerr << "... last heartbeat at " << lastStep << Endl;

const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table");
const auto shards = GetTableShards(server, edgeActor, "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);

ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
ui64 snapshotStep = lastStep + 3000 - 1;
ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep));

TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates(runtime,
[&](auto& ev) {
return ev->Get()->Record.GetTimeBarrier() > snapshotStep;
});

Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl;
{
auto req = std::make_unique<TEvDataShard::TEvRead>();
{
auto& record = req->Record;
record.SetReadId(1);
record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
record.AddColumns(1);
record.AddColumns(2);
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
ui32 key = 1;
TVector<TCell> keys;
keys.push_back(TCell::Make(key));
req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys)));
record.MutableSnapshot()->SetStep(snapshotStep);
record.MutableSnapshot()->SetTxId(Max<ui64>());
}
ForwardToTablet(runtime, shards.at(0), edgeActor, req.release());
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(edgeActor);
auto* res = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true);
Cerr << "... read finished" << Endl;
}
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... starting upsert 1 (expected to displace)" << Endl;
auto upsert1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
)");
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... starting upsert 2 (expected to displace)" << Endl;
auto upsert2 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
)");
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... unblocking updates" << Endl;
blockedUpdates.Unblock().Stop();
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... checking the update is logged before the new resolved timestamp" << Endl;
records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":10},"key":[1],"ts":"***"})",
R"({"resolved":"***"})",
R"({"update":{"value":20},"key":[2],"ts":"***"})",
R"({"update":{"value":30},"key":[3],"ts":"***"})",
R"({"resolved":"***"})",
});

TRowVersion resolved(0, 0);
for (auto& record : records) {
if (record.Has("resolved")) {
resolved.Step = record["resolved"][0].GetUInteger();
resolved.TxId = record["resolved"][1].GetUInteger();
}
if (record.Has("ts")) {
TRowVersion ts(
record["ts"][0].GetUInteger(),
record["ts"][1].GetUInteger());
UNIT_ASSERT_C(resolved < ts,
"Record with ts " << ts << " after resolved " << resolved);
}
}
}

} // Cdc

} // NKikimr
Expand Down