Skip to content

Commit 13d371d

Browse files
committed
Common TChangeRecord in ydb/core/change_exchange
1 parent ba66d9c commit 13d371d

File tree

7 files changed

+63
-267
lines changed

7 files changed

+63
-267
lines changed

ydb/core/change_exchange/change_record.cpp

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22

33
#include <ydb/core/protos/change_exchange.pb.h>
44

5-
namespace NKikimr::NDataShard {
5+
namespace NKikimr::NChangeExchange {
66

77
void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const {
88
record.SetOrder(Order);
99
record.SetGroup(Group);
1010
record.SetStep(Step);
1111
record.SetTxId(TxId);
12-
record.SetPathOwnerId(PathId.OwnerId);
13-
record.SetLocalPathId(PathId.LocalPathId);
1412

1513
switch (Kind) {
1614
case EKind::AsyncIndex: {
@@ -70,15 +68,6 @@ TInstant TChangeRecord::GetApproximateCreationDateTime() const {
7068
: TInstant::MilliSeconds(GetStep());
7169
}
7270

73-
bool TChangeRecord::IsBroadcast() const {
74-
switch (Kind) {
75-
case EKind::CdcHeartbeat:
76-
return true;
77-
default:
78-
return false;
79-
}
80-
}
81-
8271
TString TChangeRecord::ToString() const {
8372
TString result;
8473
TStringOutput out(result);
@@ -92,14 +81,9 @@ void TChangeRecord::Out(IOutputStream& out) const {
9281
<< " Group: " << Group
9382
<< " Step: " << Step
9483
<< " TxId: " << TxId
95-
<< " PathId: " << PathId
9684
<< " Kind: " << Kind
9785
<< " Source: " << Source
9886
<< " Body: " << Body.size() << "b"
99-
<< " TableId: " << TableId
100-
<< " SchemaVersion: " << SchemaVersion
101-
<< " LockId: " << LockId
102-
<< " LockOffset: " << LockOffset
10387
<< " }";
10488
}
10589

Lines changed: 27 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
#pragma once
22

3-
#include "datashard_user_table.h"
4-
5-
#include <ydb/core/scheme/scheme_pathid.h>
63
#include <ydb/core/scheme/scheme_tablecell.h>
74

85
#include <util/generic/maybe.h>
@@ -12,12 +9,12 @@ namespace NKikimrChangeExchange {
129
class TChangeRecord;
1310
}
1411

15-
namespace NKikimr::NDataShard {
12+
namespace NKikimr::NChangeExchange {
1613

17-
class TChangeRecordBuilder;
14+
template <typename T, typename TDerived> class TChangeRecordBuilder;
1815

1916
class TChangeRecord {
20-
friend class TChangeRecordBuilder;
17+
template <typename T, typename TDerived> friend class TChangeRecordBuilder;
2118

2219
public:
2320
enum class ESource: ui8 {
@@ -36,50 +33,37 @@ class TChangeRecord {
3633
ui64 GetGroup() const { return Group; }
3734
ui64 GetStep() const { return Step; }
3835
ui64 GetTxId() const { return TxId; }
39-
ui64 GetLockId() const { return LockId; }
40-
ui64 GetLockOffset() const { return LockOffset; }
41-
const TPathId& GetPathId() const { return PathId; }
4236
EKind GetKind() const { return Kind; }
4337
const TString& GetBody() const { return Body; }
4438
ESource GetSource() const { return Source; }
4539

46-
const TPathId& GetTableId() const { return TableId; }
47-
ui64 GetSchemaVersion() const { return SchemaVersion; }
48-
TUserTable::TCPtr GetSchema() const { return Schema; }
49-
5040
void Serialize(NKikimrChangeExchange::TChangeRecord& record) const;
5141

5242
TConstArrayRef<TCell> GetKey() const;
5343
i64 GetSeqNo() const;
54-
TString GetPartitionKey() const;
5544
TInstant GetApproximateCreationDateTime() const;
56-
bool IsBroadcast() const;
5745

5846
TString ToString() const;
5947
void Out(IOutputStream& out) const;
6048

61-
private:
49+
protected:
6250
ui64 Order = Max<ui64>();
6351
ui64 Group = 0;
6452
ui64 Step = 0;
6553
ui64 TxId = 0;
66-
ui64 LockId = 0;
67-
ui64 LockOffset = 0;
68-
TPathId PathId;
6954
EKind Kind;
7055
TString Body;
7156
ESource Source = ESource::Unspecified;
7257

73-
ui64 SchemaVersion;
74-
TPathId TableId;
75-
TUserTable::TCPtr Schema;
76-
7758
mutable TMaybe<TOwnedCellVec> Key;
7859
mutable TMaybe<TString> PartitionKey;
7960

8061
}; // TChangeRecord
8162

63+
template <typename T, typename TDerived>
8264
class TChangeRecordBuilder {
65+
protected:
66+
using TSelf = TDerived;
8367
using EKind = TChangeRecord::EKind;
8468
using ESource = TChangeRecord::ESource;
8569

@@ -88,87 +72,56 @@ class TChangeRecordBuilder {
8872
Record.Kind = kind;
8973
}
9074

91-
explicit TChangeRecordBuilder(TChangeRecord&& record)
92-
: Record(std::move(record))
93-
{
94-
}
95-
96-
TChangeRecordBuilder& WithLockId(ui64 lockId) {
97-
Record.LockId = lockId;
98-
return *this;
99-
}
100-
101-
TChangeRecordBuilder& WithLockOffset(ui64 lockOffset) {
102-
Record.LockOffset = lockOffset;
103-
return *this;
75+
explicit TChangeRecordBuilder(T&& record) {
76+
Record = std::move(record);
10477
}
10578

106-
TChangeRecordBuilder& WithOrder(ui64 order) {
79+
TSelf& WithOrder(ui64 order) {
10780
Record.Order = order;
108-
return *this;
81+
return static_cast<TSelf&>(*this);
10982
}
11083

111-
TChangeRecordBuilder& WithGroup(ui64 group) {
84+
TSelf& WithGroup(ui64 group) {
11285
Record.Group = group;
113-
return *this;
86+
return static_cast<TSelf&>(*this);
11487
}
11588

116-
TChangeRecordBuilder& WithStep(ui64 step) {
89+
TSelf& WithStep(ui64 step) {
11790
Record.Step = step;
118-
return *this;
91+
return static_cast<TSelf&>(*this);
11992
}
12093

121-
TChangeRecordBuilder& WithTxId(ui64 txId) {
94+
TSelf& WithTxId(ui64 txId) {
12295
Record.TxId = txId;
123-
return *this;
124-
}
125-
126-
TChangeRecordBuilder& WithPathId(const TPathId& pathId) {
127-
Record.PathId = pathId;
128-
return *this;
129-
}
130-
131-
TChangeRecordBuilder& WithTableId(const TPathId& tableId) {
132-
Record.TableId = tableId;
133-
return *this;
134-
}
135-
136-
TChangeRecordBuilder& WithSchemaVersion(ui64 version) {
137-
Record.SchemaVersion = version;
138-
return *this;
139-
}
140-
141-
TChangeRecordBuilder& WithSchema(TUserTable::TCPtr schema) {
142-
Record.Schema = schema;
143-
return *this;
96+
return static_cast<TSelf&>(*this);
14497
}
14598

146-
TChangeRecordBuilder& WithBody(const TString& body) {
99+
TSelf& WithBody(const TString& body) {
147100
Record.Body = body;
148-
return *this;
101+
return static_cast<TSelf&>(*this);
149102
}
150103

151-
TChangeRecordBuilder& WithBody(TString&& body) {
104+
TSelf& WithBody(TString&& body) {
152105
Record.Body = std::move(body);
153-
return *this;
106+
return static_cast<TSelf&>(*this);
154107
}
155108

156-
TChangeRecordBuilder& WithSource(ESource source) {
109+
TSelf& WithSource(ESource source) {
157110
Record.Source = source;
158-
return *this;
111+
return static_cast<TSelf&>(*this);
159112
}
160113

161-
TChangeRecord&& Build() {
114+
T&& Build() {
162115
return std::move(Record);
163116
}
164117

165-
private:
166-
TChangeRecord Record;
118+
protected:
119+
T Record;
167120

168121
}; // TChangeRecordBuilder
169122

170123
}
171124

172-
Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TChangeRecord, out, value) {
125+
Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TChangeRecord, out, value) {
173126
return value.Out(out);
174127
}

ydb/core/change_exchange/ya.make

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
change_record.cpp
5+
)
6+
7+
GENERATE_ENUM_SERIALIZATION(change_record.h)
8+
9+
PEERDIR(
10+
ydb/core/protos
11+
ydb/core/scheme
12+
)
13+
14+
YQL_LAST_ABI_VERSION()
15+
16+
END()

ydb/core/tx/datashard/change_record.cpp

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,69 +5,9 @@
55
namespace NKikimr::NDataShard {
66

77
void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const {
8-
record.SetOrder(Order);
9-
record.SetGroup(Group);
10-
record.SetStep(Step);
11-
record.SetTxId(TxId);
8+
NChangeExchange::TChangeRecord::Serialize(record);
129
record.SetPathOwnerId(PathId.OwnerId);
1310
record.SetLocalPathId(PathId.LocalPathId);
14-
15-
switch (Kind) {
16-
case EKind::AsyncIndex: {
17-
Y_ABORT_UNLESS(record.MutableAsyncIndex()->ParseFromArray(Body.data(), Body.size()));
18-
break;
19-
}
20-
case EKind::CdcDataChange: {
21-
Y_ABORT_UNLESS(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size()));
22-
break;
23-
}
24-
case EKind::CdcHeartbeat: {
25-
break;
26-
}
27-
}
28-
}
29-
30-
static auto ParseBody(const TString& protoBody) {
31-
NKikimrChangeExchange::TDataChange body;
32-
Y_ABORT_UNLESS(body.ParseFromArray(protoBody.data(), protoBody.size()));
33-
return body;
34-
}
35-
36-
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
37-
if (Key) {
38-
return *Key;
39-
}
40-
41-
switch (Kind) {
42-
case EKind::AsyncIndex:
43-
case EKind::CdcDataChange: {
44-
const auto parsed = ParseBody(Body);
45-
46-
TSerializedCellVec key;
47-
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
48-
49-
Key.ConstructInPlace(key.GetCells());
50-
break;
51-
}
52-
53-
case EKind::CdcHeartbeat: {
54-
Y_ABORT("Not supported");
55-
}
56-
}
57-
58-
Y_ABORT_UNLESS(Key);
59-
return *Key;
60-
}
61-
62-
i64 TChangeRecord::GetSeqNo() const {
63-
Y_ABORT_UNLESS(Order <= Max<i64>());
64-
return static_cast<i64>(Order);
65-
}
66-
67-
TInstant TChangeRecord::GetApproximateCreationDateTime() const {
68-
return GetGroup()
69-
? TInstant::MicroSeconds(GetGroup())
70-
: TInstant::MilliSeconds(GetStep());
7111
}
7212

7313
bool TChangeRecord::IsBroadcast() const {
@@ -79,13 +19,6 @@ bool TChangeRecord::IsBroadcast() const {
7919
}
8020
}
8121

82-
TString TChangeRecord::ToString() const {
83-
TString result;
84-
TStringOutput out(result);
85-
Out(out);
86-
return result;
87-
}
88-
8922
void TChangeRecord::Out(IOutputStream& out) const {
9023
out << "{"
9124
<< " Order: " << Order

0 commit comments

Comments
 (0)