Skip to content

Commit baa0367

Browse files
authored
Send heartbeat message after writing the data (#13168)
1 parent d7e5925 commit baa0367

File tree

2 files changed

+96
-2
lines changed

2 files changed

+96
-2
lines changed

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,7 @@ class TLocalTableWriter
439439
const auto version = TRowVersion(record->GetStep(), record->GetTxId());
440440

441441
if (record->GetKind() == NChangeExchange::IChangeRecord::EKind::CdcHeartbeat) {
442-
TxIds.erase(TxIds.begin(), TxIds.upper_bound(version));
443-
Send(Worker, new TEvService::TEvHeartbeat(version));
442+
PendingHeartbeat = version;
444443
continue;
445444
} else if (record->GetKind() != NChangeExchange::IChangeRecord::EKind::CdcDataChange) {
446445
Y_ABORT("Unexpected record kind");
@@ -467,6 +466,12 @@ class TLocalTableWriter
467466
EnqueueRecords(std::move(records));
468467
} else if (PendingTxId.empty()) {
469468
Y_ABORT_UNLESS(PendingRecords.empty());
469+
470+
if (const auto maxVersion = std::exchange(PendingHeartbeat, TRowVersion::Min())) {
471+
TxIds.erase(TxIds.begin(), TxIds.upper_bound(maxVersion));
472+
Send(Worker, new TEvService::TEvHeartbeat(maxVersion));
473+
}
474+
470475
Send(Worker, new TEvWorker::TEvPoll());
471476
}
472477
}
@@ -522,6 +527,11 @@ class TLocalTableWriter
522527
}
523528

524529
if (PendingRecords.empty() && PendingTxId.empty()) {
530+
if (const auto maxVersion = std::exchange(PendingHeartbeat, TRowVersion::Min())) {
531+
TxIds.erase(TxIds.begin(), TxIds.upper_bound(maxVersion));
532+
Send(Worker, new TEvService::TEvHeartbeat(maxVersion));
533+
}
534+
525535
Send(Worker, new TEvWorker::TEvPoll());
526536
}
527537
}
@@ -617,6 +627,7 @@ class TLocalTableWriter
617627
TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
618628
TMap<TRowVersion, ui64> TxIds; // key is non-inclusive right hand edge
619629
TMap<TRowVersion, TVector<NChangeExchange::IChangeRecord::TPtr>> PendingTxId;
630+
TRowVersion PendingHeartbeat = TRowVersion::Min();
620631

621632
}; // TLocalTableWriter
622633

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,89 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
286286
{TRowVersion(20, 0), 2},
287287
}));
288288
}
289+
290+
Y_UNIT_TEST(DataAlongWithHeartbeat) {
291+
class TMockWorker: public TActorBootstrapped<TMockWorker> {
292+
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
293+
if (ev->Sender == Edge) {
294+
ev->Sender = SelfId();
295+
Send(ev->Forward(Writer));
296+
} else {
297+
Send(ev->Forward(Edge));
298+
}
299+
}
300+
301+
void Handle(TEvService::TEvGetTxId::TPtr& ev) {
302+
++GetTxIds;
303+
Send(ev->Forward(Edge));
304+
}
305+
306+
void Handle(TEvService::TEvHeartbeat::TPtr& ev) {
307+
UNIT_ASSERT(Heartbeats++ < GetTxIds);
308+
Send(ev->Forward(Edge));
309+
}
310+
311+
void Handle(TEvWorker::TEvPoll::TPtr& ev) {
312+
UNIT_ASSERT(Polls++ < Heartbeats);
313+
Send(ev->Forward(Edge));
314+
}
315+
316+
public:
317+
explicit TMockWorker(const TActorId& writer, const TActorId& edge)
318+
: Writer(writer)
319+
, Edge(edge)
320+
{}
321+
322+
void Bootstrap() {
323+
Become(&TThis::StateWork);
324+
}
325+
326+
STATEFN(StateWork) {
327+
switch (ev->GetTypeRewrite()) {
328+
hFunc(TEvWorker::TEvHandshake, Handle);
329+
hFunc(TEvService::TEvGetTxId, Handle);
330+
hFunc(TEvService::TEvHeartbeat, Handle);
331+
hFunc(TEvWorker::TEvPoll, Handle);
332+
}
333+
}
334+
335+
private:
336+
const TActorId Writer;
337+
const TActorId Edge;
338+
ui32 GetTxIds = 0;
339+
ui32 Heartbeats = 0;
340+
ui32 Polls = 0;
341+
};
342+
343+
TEnv env;
344+
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);
345+
346+
env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{
347+
.Name = "Table",
348+
.KeyColumns = {"key"},
349+
.Columns = {
350+
{.Name = "key", .Type = "Uint32"},
351+
{.Name = "value", .Type = "Utf8"},
352+
},
353+
.ReplicationConfig = TTestTableDescription::TReplicationConfig{
354+
.Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY,
355+
.ConsistencyLevel = TTestTableDescription::TReplicationConfig::CONSISTENCY_LEVEL_GLOBAL,
356+
},
357+
}));
358+
359+
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent));
360+
auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender()));
361+
362+
env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
363+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
364+
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
365+
TRecord(2, R"({"resolved":[10,0]})"),
366+
}));
367+
env.Send<TEvService::TEvHeartbeat>(writer, MakeTxIdResult({
368+
{TRowVersion(10, 0), 1},
369+
}));
370+
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
371+
}
289372
}
290373

291374
}

0 commit comments

Comments
 (0)