Skip to content

Commit 6735aec

Browse files
authored
Specify source of changes (#4680)
1 parent 7583345 commit 6735aec

File tree

9 files changed

+46
-17
lines changed

9 files changed

+46
-17
lines changed

ydb/core/tx/replication/service/json_change_record.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ NChangeExchange::IChangeRecord::EKind TChangeRecord::GetKind() const {
3333
: EKind::CdcDataChange;
3434
}
3535

36+
TString TChangeRecord::GetSourceId() const {
37+
return SourceId;
38+
}
39+
3640
static bool ParseKey(TVector<TCell>& cells,
3741
const NJson::TJsonValue::TArray& key, TLightweightSchema::TCPtr schema, TMemoryPool& pool, TString& error)
3842
{

ydb/core/tx/replication/service/json_change_record.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
4040
ui64 GetStep() const override;
4141
ui64 GetTxId() const override;
4242
EKind GetKind() const override;
43+
TString GetSourceId() const;
4344

4445
void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TMemoryPool& pool) const;
4546
void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const;
@@ -48,6 +49,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
4849
TConstArrayRef<TCell> GetKey() const;
4950

5051
private:
52+
TString SourceId;
5153
NJson::TJsonValue JsonBody;
5254
TLightweightSchema::TCPtr Schema;
5355

@@ -59,6 +61,11 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChange
5961
public:
6062
using TBase::TBase;
6163

64+
TSelf& WithSourceId(const TString& sourceId) {
65+
GetRecord()->SourceId = sourceId;
66+
return static_cast<TSelf&>(*this);
67+
}
68+
6269
template <typename T>
6370
TSelf& WithBody(T&& body) {
6471
auto res = NJson::ReadJsonTree(body, &GetRecord()->JsonBody);

ydb/core/tx/replication/service/table_writer.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,21 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
7676
tableId.SetTableId(TableId.PathId.LocalPathId);
7777
tableId.SetSchemaVersion(TableId.SchemaVersion);
7878

79+
TString source;
7980
for (auto recordPtr : ev->Get()->Records) {
8081
MemoryPool.Clear();
8182
const auto& record = *recordPtr->Get<TChangeRecord>();
8283
record.Serialize(*event->Record.AddChanges(), MemoryPool);
83-
// TODO: set WriteTxId, Source
84+
85+
if (!source) {
86+
source = record.GetSourceId();
87+
} else {
88+
Y_ABORT_UNLESS(source == record.GetSourceId());
89+
}
90+
}
91+
92+
if (source) {
93+
event->Record.SetSource(source);
8494
}
8595

8696
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false));
@@ -427,6 +437,7 @@ class TLocalTableWriter
427437
for (auto& record : ev->Get()->Records) {
428438
records.emplace_back(record.Offset, PathId, record.Data.size());
429439
auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilder()
440+
.WithSourceId(ev->Get()->Source)
430441
.WithOrder(record.Offset)
431442
.WithBody(std::move(record.Data))
432443
.WithSchema(Schema)

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
3131
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
3232

3333
using TRecord = TEvWorker::TEvData::TRecord;
34-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
34+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
3535
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
3636
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
3737
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
@@ -73,7 +73,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
7373
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
7474

7575
using TRecord = TEvWorker::TEvData::TRecord;
76-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
76+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
7777
TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
7878
TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
7979
TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"),

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
6161
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
6262
}
6363

64-
Send(Worker, new TEvWorker::TEvData(std::move(records)));
64+
Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records)));
6565
}
6666

6767
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {

ydb/core/tx/replication/service/worker.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
2323
{
2424
}
2525

26-
TEvWorker::TEvData::TEvData(const TVector<TRecord>& records)
27-
: Records(records)
26+
TEvWorker::TEvData::TEvData(const TString& source, const TVector<TRecord>& records)
27+
: Source(source)
28+
, Records(records)
2829
{
2930
}
3031

31-
TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
32-
: Records(std::move(records))
32+
TEvWorker::TEvData::TEvData(const TString& source, TVector<TRecord>&& records)
33+
: Source(source)
34+
, Records(std::move(records))
3335
{
3436
}
3537

@@ -42,6 +44,7 @@ void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
4244

4345
TString TEvWorker::TEvData::ToString() const {
4446
return TStringBuilder() << ToStringHeader() << " {"
47+
<< " Source: " << Source
4548
<< " Records [" << JoinSeq(",", Records) << "]"
4649
<< " }";
4750
}
@@ -115,16 +118,16 @@ class TWorker: public TActorBootstrapped<TWorker> {
115118
<< ": sender# " << ev->Sender);
116119

117120
Reader.Registered();
118-
if (!InFlightRecords) {
121+
if (!InFlightData) {
119122
Send(Reader, new TEvWorker::TEvPoll());
120123
}
121124
} else if (ev->Sender == Writer) {
122125
LOG_I("Handshake with writer"
123126
<< ": sender# " << ev->Sender);
124127

125128
Writer.Registered();
126-
if (InFlightRecords) {
127-
Send(Writer, new TEvWorker::TEvData(InFlightRecords));
129+
if (InFlightData) {
130+
Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records));
128131
}
129132
} else {
130133
LOG_W("Handshake from unknown actor"
@@ -142,7 +145,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
142145
return;
143146
}
144147

145-
InFlightRecords.clear();
148+
InFlightData.Reset();
146149
if (Reader) {
147150
Send(ev->Forward(Reader));
148151
}
@@ -157,8 +160,8 @@ class TWorker: public TActorBootstrapped<TWorker> {
157160
return;
158161
}
159162

160-
Y_ABORT_UNLESS(InFlightRecords.empty());
161-
InFlightRecords = ev->Get()->Records;
163+
Y_ABORT_UNLESS(!InFlightData);
164+
InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->Source, ev->Get()->Records);
162165

163166
if (Writer) {
164167
Send(ev->Forward(Writer));
@@ -239,7 +242,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
239242
mutable TMaybe<TString> LogPrefix;
240243
TActorInfo Reader;
241244
TActorInfo Writer;
242-
TVector<TEvWorker::TEvData::TRecord> InFlightRecords;
245+
THolder<TEvWorker::TEvData> InFlightData;
243246
};
244247

245248
IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn) {

ydb/core/tx/replication/service/worker.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ struct TEvWorker {
3636
void Out(IOutputStream& out) const;
3737
};
3838

39+
TString Source;
3940
TVector<TRecord> Records;
4041

41-
explicit TEvData(const TVector<TRecord>& records);
42-
explicit TEvData(TVector<TRecord>&& records);
42+
explicit TEvData(const TString& source, const TVector<TRecord>& records);
43+
explicit TEvData(const TString& source, TVector<TRecord>&& records);
4344
TString ToString() const override;
4445
};
4546

ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ void TEvYdbProxy::TReadTopicResult::TMessage::Out(IOutputStream& out) const {
3333

3434
void TEvYdbProxy::TReadTopicResult::Out(IOutputStream& out) const {
3535
out << "{"
36+
<< " PartitionId: " << PartitionId
3637
<< " Messages [" << JoinSeq(",", Messages) << "]"
3738
<< " }";
3839
}

ydb/core/tx/replication/ydb_proxy/ydb_proxy.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ struct TEvYdbProxy {
186186
};
187187

188188
explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
189+
PartitionId = event.GetPartitionSession()->GetPartitionId();
189190
Messages.reserve(event.GetMessagesCount());
190191
if (event.HasCompressedMessages()) {
191192
for (const auto& msg : event.GetCompressedMessages()) {
@@ -200,6 +201,7 @@ struct TEvYdbProxy {
200201

201202
void Out(IOutputStream& out) const;
202203

204+
ui64 PartitionId;
203205
TVector<TMessage> Messages;
204206
};
205207

0 commit comments

Comments
 (0)