Skip to content

Commit f579bfa

Browse files
committed
Specify source of changes
1 parent 19a2355 commit f579bfa

File tree

9 files changed

+46
-17
lines changed

9 files changed

+46
-17
lines changed

ydb/core/tx/replication/service/json_change_record.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ NChangeExchange::IChangeRecord::EKind TChangeRecord::GetKind() const {
3333
: EKind::CdcDataChange;
3434
}
3535

36+
TString TChangeRecord::GetSourceId() const {
37+
return SourceId;
38+
}
39+
3640
static bool ParseKey(TVector<TCell>& cells,
3741
const NJson::TJsonValue::TArray& key, TLightweightSchema::TCPtr schema, TMemoryPool& pool, TString& error)
3842
{

ydb/core/tx/replication/service/json_change_record.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
3838
ui64 GetStep() const override;
3939
ui64 GetTxId() const override;
4040
EKind GetKind() const override;
41+
TString GetSourceId() const;
4142

4243
void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const;
4344

4445
TConstArrayRef<TCell> GetKey() const;
4546

4647
private:
48+
TString SourceId;
4749
NJson::TJsonValue JsonBody;
4850
TLightweightSchema::TCPtr Schema;
4951

@@ -55,6 +57,11 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChange
5557
public:
5658
using TBase::TBase;
5759

60+
TSelf& WithSourceId(const TString& sourceId) {
61+
GetRecord()->SourceId = sourceId;
62+
return static_cast<TSelf&>(*this);
63+
}
64+
5865
template <typename T>
5966
TSelf& WithBody(T&& body) {
6067
auto res = NJson::ReadJsonTree(body, &GetRecord()->JsonBody);

ydb/core/tx/replication/service/table_writer.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,20 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
7575
tableId.SetTableId(TablePathId.LocalPathId);
7676
// TODO: SetSchemaVersion?
7777

78+
TString source;
7879
for (auto recordPtr : ev->Get()->Records) {
7980
const auto& record = *recordPtr->Get<TChangeRecord>();
8081
record.Serialize(*event->Record.AddChanges());
81-
// TODO: set WriteTxId, Source
82+
83+
if (!source) {
84+
source = record.GetSourceId();
85+
} else {
86+
Y_ABORT_UNLESS(source == record.GetSourceId());
87+
}
88+
}
89+
90+
if (source) {
91+
event->Record.SetSource(source);
8292
}
8393

8494
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false));
@@ -418,6 +428,7 @@ class TLocalTableWriter
418428
for (auto& record : ev->Get()->Records) {
419429
records.emplace_back(record.Offset, PathId, record.Data.size());
420430
auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilder()
431+
.WithSourceId(ev->Get()->Source)
421432
.WithOrder(record.Offset)
422433
.WithBody(std::move(record.Data))
423434
.WithSchema(Schema)

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
3131
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
3232

3333
using TRecord = TEvWorker::TEvData::TRecord;
34-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
34+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
3535
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
3636
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
3737
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
@@ -73,7 +73,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
7373
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
7474

7575
using TRecord = TEvWorker::TEvData::TRecord;
76-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
76+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
7777
TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
7878
TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
7979
TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"),

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
6161
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
6262
}
6363

64-
Send(Worker, new TEvWorker::TEvData(std::move(records)));
64+
Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records)));
6565
}
6666

6767
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {

ydb/core/tx/replication/service/worker.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
2323
{
2424
}
2525

26-
TEvWorker::TEvData::TEvData(const TVector<TRecord>& records)
27-
: Records(records)
26+
TEvWorker::TEvData::TEvData(const TString& source, const TVector<TRecord>& records)
27+
: Source(source)
28+
, Records(records)
2829
{
2930
}
3031

31-
TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
32-
: Records(std::move(records))
32+
TEvWorker::TEvData::TEvData(const TString& source, TVector<TRecord>&& records)
33+
: Source(source)
34+
, Records(std::move(records))
3335
{
3436
}
3537

@@ -42,6 +44,7 @@ void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
4244

4345
TString TEvWorker::TEvData::ToString() const {
4446
return TStringBuilder() << ToStringHeader() << " {"
47+
<< " Source: " << Source
4548
<< " Records [" << JoinSeq(",", Records) << "]"
4649
<< " }";
4750
}
@@ -115,16 +118,16 @@ class TWorker: public TActorBootstrapped<TWorker> {
115118
<< ": sender# " << ev->Sender);
116119

117120
Reader.Registered();
118-
if (!InFlightRecords) {
121+
if (!InFlightData) {
119122
Send(Reader, new TEvWorker::TEvPoll());
120123
}
121124
} else if (ev->Sender == Writer) {
122125
LOG_I("Handshake with writer"
123126
<< ": sender# " << ev->Sender);
124127

125128
Writer.Registered();
126-
if (InFlightRecords) {
127-
Send(Writer, new TEvWorker::TEvData(InFlightRecords));
129+
if (InFlightData) {
130+
Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records));
128131
}
129132
} else {
130133
LOG_W("Handshake from unknown actor"
@@ -142,7 +145,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
142145
return;
143146
}
144147

145-
InFlightRecords.clear();
148+
InFlightData.Reset();
146149
if (Reader) {
147150
Send(ev->Forward(Reader));
148151
}
@@ -157,8 +160,8 @@ class TWorker: public TActorBootstrapped<TWorker> {
157160
return;
158161
}
159162

160-
Y_ABORT_UNLESS(InFlightRecords.empty());
161-
InFlightRecords = ev->Get()->Records;
163+
Y_ABORT_UNLESS(!InFlightData);
164+
InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->Source, ev->Get()->Records);
162165

163166
if (Writer) {
164167
Send(ev->Forward(Writer));
@@ -239,7 +242,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
239242
mutable TMaybe<TString> LogPrefix;
240243
TActorInfo Reader;
241244
TActorInfo Writer;
242-
TVector<TEvWorker::TEvData::TRecord> InFlightRecords;
245+
THolder<TEvWorker::TEvData> InFlightData;
243246
};
244247

245248
IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn) {

ydb/core/tx/replication/service/worker.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ struct TEvWorker {
3636
void Out(IOutputStream& out) const;
3737
};
3838

39+
TString Source;
3940
TVector<TRecord> Records;
4041

41-
explicit TEvData(const TVector<TRecord>& records);
42-
explicit TEvData(TVector<TRecord>&& records);
42+
explicit TEvData(const TString& source, const TVector<TRecord>& records);
43+
explicit TEvData(const TString& source, TVector<TRecord>&& records);
4344
TString ToString() const override;
4445
};
4546

ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ void TEvYdbProxy::TReadTopicResult::TMessage::Out(IOutputStream& out) const {
3333

3434
void TEvYdbProxy::TReadTopicResult::Out(IOutputStream& out) const {
3535
out << "{"
36+
<< " PartitionId: " << PartitionId
3637
<< " Messages [" << JoinSeq(",", Messages) << "]"
3738
<< " }";
3839
}

ydb/core/tx/replication/ydb_proxy/ydb_proxy.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ struct TEvYdbProxy {
186186
};
187187

188188
explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
189+
PartitionId = event.GetPartitionSession()->GetPartitionId();
189190
Messages.reserve(event.GetMessagesCount());
190191
if (event.HasCompressedMessages()) {
191192
for (const auto& msg : event.GetCompressedMessages()) {
@@ -200,6 +201,7 @@ struct TEvYdbProxy {
200201

201202
void Out(IOutputStream& out) const;
202203

204+
ui64 PartitionId;
203205
TVector<TMessage> Messages;
204206
};
205207

0 commit comments

Comments
 (0)