Skip to content

Commit 14a7b7e

Browse files
authored
Apply changes in correct order (#13232)
1 parent c456fce commit 14a7b7e

File tree

6 files changed

+156
-34
lines changed

6 files changed

+156
-34
lines changed

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,12 +1539,10 @@ void ApplyChanges(
15391539
}
15401540

15411541
TRowVersion CommitWrites(
1542-
Tests::TServer::TPtr server,
1542+
TTestActorRuntime& runtime,
15431543
const TVector<TString>& tables,
15441544
ui64 writeTxId)
15451545
{
1546-
auto& runtime = *server->GetRuntime();
1547-
15481546
TActorId sender = runtime.AllocateEdgeActor();
15491547

15501548
{
@@ -1571,6 +1569,14 @@ TRowVersion CommitWrites(
15711569
return { step, txId };
15721570
}
15731571

1572+
TRowVersion CommitWrites(
1573+
Tests::TServer::TPtr server,
1574+
const TVector<TString>& tables,
1575+
ui64 writeTxId)
1576+
{
1577+
return CommitWrites(*server->GetRuntime(), tables, writeTxId);
1578+
}
1579+
15741580
ui64 AsyncDropTable(
15751581
Tests::TServer::TPtr server,
15761582
TActorId sender,
@@ -2615,6 +2621,7 @@ namespace {
26152621
PRINT_PRIMITIVE(Datetime64);
26162622
PRINT_PRIMITIVE(Timestamp64);
26172623
PRINT_PRIMITIVE(String);
2624+
PRINT_PRIMITIVE(Utf8);
26182625
PRINT_PRIMITIVE(DyNumber);
26192626

26202627
default:
@@ -2654,13 +2661,12 @@ namespace {
26542661
} // namespace
26552662

26562663
TReadShardedTableState StartReadShardedTable(
2657-
Tests::TServer::TPtr server,
2664+
TTestActorRuntime& runtime,
26582665
const TString& path,
26592666
TRowVersion snapshot,
26602667
bool pause,
26612668
bool ordered)
26622669
{
2663-
auto& runtime = *server->GetRuntime();
26642670
auto sender = runtime.AllocateEdgeActor();
26652671
auto worker = runtime.Register(new TReadTableImpl(sender, path, snapshot, pause, ordered));
26662672
auto ev = runtime.GrabEdgeEventRethrow<TReadTableImpl::TEvResult>(sender);
@@ -2671,6 +2677,16 @@ TReadShardedTableState StartReadShardedTable(
26712677
return { sender, worker, result };
26722678
}
26732679

2680+
TReadShardedTableState StartReadShardedTable(
2681+
Tests::TServer::TPtr server,
2682+
const TString& path,
2683+
TRowVersion snapshot,
2684+
bool pause,
2685+
bool ordered)
2686+
{
2687+
return StartReadShardedTable(*server->GetRuntime(), path, snapshot, pause, ordered);
2688+
}
2689+
26742690
void ResumeReadShardedTable(
26752691
Tests::TServer::TPtr server,
26762692
TReadShardedTableState& state)
@@ -2681,12 +2697,20 @@ void ResumeReadShardedTable(
26812697
state.Result = ev->Get()->Result;
26822698
}
26832699

2700+
TString ReadShardedTable(
2701+
TTestActorRuntime& runtime,
2702+
const TString& path,
2703+
TRowVersion snapshot)
2704+
{
2705+
return StartReadShardedTable(runtime, path, snapshot, /* pause = */ false).Result;
2706+
}
2707+
26842708
TString ReadShardedTable(
26852709
Tests::TServer::TPtr server,
26862710
const TString& path,
26872711
TRowVersion snapshot)
26882712
{
2689-
return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result;
2713+
return ReadShardedTable(*server->GetRuntime(), path, snapshot);
26902714
}
26912715

26922716
void SendViaPipeCache(

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,10 @@ void ApplyChanges(
597597
NKikimrTxDataShard::TEvApplyReplicationChangesResult::EStatus expected =
598598
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);
599599

600+
TRowVersion CommitWrites(
601+
TTestActorRuntime& runtime,
602+
const TVector<TString>& tables,
603+
ui64 writeTxId);
600604
TRowVersion CommitWrites(
601605
Tests::TServer::TPtr server,
602606
const TVector<TString>& tables,
@@ -724,6 +728,12 @@ struct TReadShardedTableState {
724728
TString Result;
725729
};
726730

731+
TReadShardedTableState StartReadShardedTable(
732+
TTestActorRuntime& runtime,
733+
const TString& path,
734+
TRowVersion snapshot = TRowVersion::Max(),
735+
bool pause = true,
736+
bool ordered = true);
727737
TReadShardedTableState StartReadShardedTable(
728738
Tests::TServer::TPtr server,
729739
const TString& path,
@@ -735,6 +745,10 @@ void ResumeReadShardedTable(
735745
Tests::TServer::TPtr server,
736746
TReadShardedTableState& state);
737747

748+
TString ReadShardedTable(
749+
TTestActorRuntime& runtime,
750+
const TString& path,
751+
TRowVersion snapshot = TRowVersion::Max());
738752
TString ReadShardedTable(
739753
Tests::TServer::TPtr server,
740754
const TString& path,

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <ydb/library/actors/core/hfunc.h>
1616
#include <ydb/library/services/services.pb.h>
1717

18+
#include <util/generic/hash.h>
1819
#include <util/generic/map.h>
1920
#include <util/generic/maybe.h>
2021
#include <util/generic/set.h>
@@ -447,14 +448,19 @@ class TLocalTableWriter
447448

448449
if (auto it = TxIds.upper_bound(version); it != TxIds.end()) {
449450
record->RewriteTxId(it->second);
451+
if (PendingTxId.empty()) {
452+
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
453+
} else {
454+
BlockedRecords.insert(record->GetOrder());
455+
}
450456
} else {
451457
versionsWithoutTxId.insert(version);
452-
PendingTxId[version].push_back(std::move(record));
453-
continue;
458+
PendingTxId.insert(record->GetOrder());
454459
}
460+
} else {
461+
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
455462
}
456463

457-
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
458464
Y_ABORT_UNLESS(PendingRecords.emplace(record->GetOrder(), std::move(record)).second);
459465
}
460466

@@ -485,18 +491,55 @@ class TLocalTableWriter
485491
const auto version = TRowVersion::FromProto(kv.GetVersion());
486492
TxIds.emplace(version, kv.GetTxId());
487493

488-
for (auto it = PendingTxId.begin(); it != PendingTxId.end();) {
489-
if (it->first >= version) {
490-
break;
494+
auto pendingIt = PendingTxId.begin();
495+
auto blockedIt = BlockedRecords.begin();
496+
497+
auto processPending = [this, &records, &version, txId = kv.GetTxId()](auto& it) -> bool {
498+
Y_ABORT_UNLESS(PendingRecords.contains(*it));
499+
auto& record = PendingRecords.at(*it);
500+
501+
if (TRowVersion(record->GetStep(), record->GetTxId()) >= version) {
502+
return false;
491503
}
492504

493-
for (auto& record : it->second) {
494-
record->RewriteTxId(kv.GetTxId());
495-
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
496-
Y_ABORT_UNLESS(PendingRecords.emplace(record->GetOrder(), std::move(record)).second);
505+
record->RewriteTxId(txId);
506+
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
507+
508+
it = PendingTxId.erase(it);
509+
return true;
510+
};
511+
512+
auto processBlocked = [this, &records](auto& it) {
513+
Y_ABORT_UNLESS(PendingRecords.contains(*it));
514+
auto& record = PendingRecords.at(*it);
515+
516+
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
517+
518+
it = BlockedRecords.erase(it);
519+
};
520+
521+
while (pendingIt != PendingTxId.end() && blockedIt != BlockedRecords.end()) {
522+
if (*pendingIt < *blockedIt) {
523+
if (!processPending(pendingIt)) {
524+
break;
525+
}
526+
} else {
527+
processBlocked(blockedIt);
497528
}
529+
}
498530

499-
PendingTxId.erase(it++);
531+
if (blockedIt == BlockedRecords.end()) {
532+
for (; pendingIt != PendingTxId.end();) {
533+
if (!processPending(pendingIt)) {
534+
break;
535+
}
536+
}
537+
}
538+
539+
if (pendingIt == PendingTxId.end()) {
540+
for (; blockedIt != BlockedRecords.end();) {
541+
processBlocked(blockedIt);
542+
}
500543
}
501544
}
502545

@@ -624,9 +667,10 @@ class TLocalTableWriter
624667
bool Resolving = false;
625668
bool Initialized = false;
626669

627-
TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
670+
THashMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
628671
TMap<TRowVersion, ui64> TxIds; // key is non-inclusive right hand edge
629-
TMap<TRowVersion, TVector<NChangeExchange::IChangeRecord::TPtr>> PendingTxId;
672+
TSet<ui64> PendingTxId;
673+
TSet<ui64> BlockedRecords;
630674
TRowVersion PendingHeartbeat = TRowVersion::Min();
631675

632676
}; // TLocalTableWriter

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
#include "table_writer.h"
33
#include "worker.h"
44

5+
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
56
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
67
#include <ydb/core/tx/replication/ut_helpers/test_table.h>
78

89
#include <library/cpp/string_utils/base64/base64.h>
910
#include <library/cpp/testing/unittest/registar.h>
1011

1112
#include <util/string/printf.h>
13+
#include <util/string/strip.h>
1214

1315
namespace NKikimr::NReplication::NService {
1416

@@ -187,9 +189,6 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
187189
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
188190
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"),
189191
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"),
190-
}));
191-
192-
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
193192
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"),
194193
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"),
195194
}));
@@ -211,7 +210,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
211210
}
212211

213212
Y_UNIT_TEST(WaitTxIds) {
214-
class TMockWorker: public TActorBootstrapped<TMockWorker> {
213+
class TMockWorker: public TActor<TMockWorker> {
215214
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
216215
if (ev->Sender == Edge) {
217216
ev->Sender = SelfId();
@@ -232,14 +231,11 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
232231

233232
public:
234233
explicit TMockWorker(const TActorId& writer, const TActorId& edge)
235-
: Writer(writer)
234+
: TActor(&TThis::StateWork)
235+
, Writer(writer)
236236
, Edge(edge)
237237
{}
238238

239-
void Bootstrap() {
240-
Become(&TThis::StateWork);
241-
}
242-
243239
STATEFN(StateWork) {
244240
switch (ev->GetTypeRewrite()) {
245241
hFunc(TEvWorker::TEvHandshake, Handle);
@@ -288,7 +284,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
288284
}
289285

290286
Y_UNIT_TEST(DataAlongWithHeartbeat) {
291-
class TMockWorker: public TActorBootstrapped<TMockWorker> {
287+
class TMockWorker: public TActor<TMockWorker> {
292288
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
293289
if (ev->Sender == Edge) {
294290
ev->Sender = SelfId();
@@ -315,14 +311,11 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
315311

316312
public:
317313
explicit TMockWorker(const TActorId& writer, const TActorId& edge)
318-
: Writer(writer)
314+
: TActor(&TThis::StateWork)
315+
, Writer(writer)
319316
, Edge(edge)
320317
{}
321318

322-
void Bootstrap() {
323-
Become(&TThis::StateWork);
324-
}
325-
326319
STATEFN(StateWork) {
327320
switch (ev->GetTypeRewrite()) {
328321
hFunc(TEvWorker::TEvHandshake, Handle);
@@ -369,6 +362,50 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
369362
}));
370363
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
371364
}
365+
366+
Y_UNIT_TEST(ApplyInCorrectOrder) {
367+
TEnv env;
368+
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);
369+
370+
env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{
371+
.Name = "Table",
372+
.KeyColumns = {"key"},
373+
.Columns = {
374+
{.Name = "key", .Type = "Uint32"},
375+
{.Name = "value", .Type = "Utf8"},
376+
},
377+
.ReplicationConfig = TTestTableDescription::TReplicationConfig{
378+
.Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY,
379+
.ConsistencyLevel = TTestTableDescription::TReplicationConfig::CONSISTENCY_LEVEL_GLOBAL,
380+
},
381+
}));
382+
383+
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent));
384+
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
385+
386+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
387+
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
388+
}));
389+
env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({
390+
{TRowVersion(10, 0), 1},
391+
}));
392+
393+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
394+
TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"),
395+
TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
396+
TRecord(4, R"({"resolved":[20,0]})"),
397+
}));
398+
env.Send<TEvService::TEvHeartbeat>(writer, MakeTxIdResult({
399+
{TRowVersion(20, 0), 2},
400+
}));
401+
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
402+
403+
CommitWrites(env.GetRuntime(), {"/Root/Table"}, 1);
404+
CommitWrites(env.GetRuntime(), {"/Root/Table"}, 2);
405+
406+
auto content = ReadShardedTable(env.GetRuntime(), "/Root/Table");
407+
UNIT_ASSERT_STRINGS_EQUAL(StripInPlace(content), "key = 1, value = 10\nkey = 2, value = 20\nkey = 3, value = 30");
408+
}
372409
}
373410

374411
}

ydb/core/tx/replication/service/ut_table_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ FORK_SUBTESTS()
55
SIZE(MEDIUM)
66

77
PEERDIR(
8+
ydb/core/tx/datashard/ut_common
89
ydb/core/tx/replication/ut_helpers
910
library/cpp/string_utils/base64
1011
library/cpp/testing/unittest

ydb/core/tx/replication/ut_helpers/write_topic.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
24

35
namespace NKikimr::NReplication::NTestHelpers {

0 commit comments

Comments
 (0)