Skip to content

Commit 25431f4

Browse files
authored
Common TChangeRecord in ydb/core/change_exchange (#737)
1 parent a32b0c1 commit 25431f4

File tree

7 files changed

+252
-176
lines changed

7 files changed

+252
-176
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#include "change_record.h"
2+
3+
#include <ydb/core/protos/change_exchange.pb.h>
4+
5+
namespace NKikimr::NChangeExchange {
6+
7+
void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const {
8+
record.SetOrder(Order);
9+
record.SetGroup(Group);
10+
record.SetStep(Step);
11+
record.SetTxId(TxId);
12+
13+
switch (Kind) {
14+
case EKind::AsyncIndex: {
15+
Y_ABORT_UNLESS(record.MutableAsyncIndex()->ParseFromArray(Body.data(), Body.size()));
16+
break;
17+
}
18+
case EKind::CdcDataChange: {
19+
Y_ABORT_UNLESS(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size()));
20+
break;
21+
}
22+
case EKind::CdcHeartbeat: {
23+
break;
24+
}
25+
}
26+
}
27+
28+
static auto ParseBody(const TString& protoBody) {
29+
NKikimrChangeExchange::TDataChange body;
30+
Y_ABORT_UNLESS(body.ParseFromArray(protoBody.data(), protoBody.size()));
31+
return body;
32+
}
33+
34+
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
35+
if (Key) {
36+
return *Key;
37+
}
38+
39+
switch (Kind) {
40+
case EKind::AsyncIndex:
41+
case EKind::CdcDataChange: {
42+
const auto parsed = ParseBody(Body);
43+
44+
TSerializedCellVec key;
45+
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
46+
47+
Key.ConstructInPlace(key.GetCells());
48+
break;
49+
}
50+
51+
case EKind::CdcHeartbeat: {
52+
Y_ABORT("Not supported");
53+
}
54+
}
55+
56+
Y_ABORT_UNLESS(Key);
57+
return *Key;
58+
}
59+
60+
i64 TChangeRecord::GetSeqNo() const {
61+
Y_ABORT_UNLESS(Order <= Max<i64>());
62+
return static_cast<i64>(Order);
63+
}
64+
65+
TInstant TChangeRecord::GetApproximateCreationDateTime() const {
66+
return GetGroup()
67+
? TInstant::MicroSeconds(GetGroup())
68+
: TInstant::MilliSeconds(GetStep());
69+
}
70+
71+
TString TChangeRecord::ToString() const {
72+
TString result;
73+
TStringOutput out(result);
74+
Out(out);
75+
return result;
76+
}
77+
78+
void TChangeRecord::Out(IOutputStream& out) const {
79+
out << "{"
80+
<< " Order: " << Order
81+
<< " Group: " << Group
82+
<< " Step: " << Step
83+
<< " TxId: " << TxId
84+
<< " Kind: " << Kind
85+
<< " Source: " << Source
86+
<< " Body: " << Body.size() << "b"
87+
<< " }";
88+
}
89+
90+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#pragma once
2+
3+
#include <ydb/core/scheme/scheme_tablecell.h>
4+
5+
#include <util/generic/maybe.h>
6+
#include <util/generic/string.h>
7+
8+
namespace NKikimrChangeExchange {
9+
class TChangeRecord;
10+
}
11+
12+
namespace NKikimr::NChangeExchange {
13+
14+
template <typename T, typename TDerived> class TChangeRecordBuilder;
15+
16+
class TChangeRecord {
17+
template <typename T, typename TDerived> friend class TChangeRecordBuilder;
18+
19+
public:
20+
enum class ESource: ui8 {
21+
Unspecified = 0,
22+
InitialScan = 1,
23+
};
24+
25+
enum class EKind: ui8 {
26+
AsyncIndex,
27+
CdcDataChange,
28+
CdcHeartbeat,
29+
};
30+
31+
public:
32+
ui64 GetOrder() const { return Order; }
33+
ui64 GetGroup() const { return Group; }
34+
ui64 GetStep() const { return Step; }
35+
ui64 GetTxId() const { return TxId; }
36+
EKind GetKind() const { return Kind; }
37+
const TString& GetBody() const { return Body; }
38+
ESource GetSource() const { return Source; }
39+
40+
void Serialize(NKikimrChangeExchange::TChangeRecord& record) const;
41+
42+
TConstArrayRef<TCell> GetKey() const;
43+
i64 GetSeqNo() const;
44+
TInstant GetApproximateCreationDateTime() const;
45+
46+
TString ToString() const;
47+
void Out(IOutputStream& out) const;
48+
49+
protected:
50+
ui64 Order = Max<ui64>();
51+
ui64 Group = 0;
52+
ui64 Step = 0;
53+
ui64 TxId = 0;
54+
EKind Kind;
55+
TString Body;
56+
ESource Source = ESource::Unspecified;
57+
58+
mutable TMaybe<TOwnedCellVec> Key;
59+
mutable TMaybe<TString> PartitionKey;
60+
61+
}; // TChangeRecord
62+
63+
template <typename T, typename TDerived>
64+
class TChangeRecordBuilder {
65+
protected:
66+
using TSelf = TDerived;
67+
using EKind = TChangeRecord::EKind;
68+
using ESource = TChangeRecord::ESource;
69+
70+
public:
71+
explicit TChangeRecordBuilder(EKind kind) {
72+
Record.Kind = kind;
73+
}
74+
75+
explicit TChangeRecordBuilder(T&& record) {
76+
Record = std::move(record);
77+
}
78+
79+
TSelf& WithOrder(ui64 order) {
80+
Record.Order = order;
81+
return static_cast<TSelf&>(*this);
82+
}
83+
84+
TSelf& WithGroup(ui64 group) {
85+
Record.Group = group;
86+
return static_cast<TSelf&>(*this);
87+
}
88+
89+
TSelf& WithStep(ui64 step) {
90+
Record.Step = step;
91+
return static_cast<TSelf&>(*this);
92+
}
93+
94+
TSelf& WithTxId(ui64 txId) {
95+
Record.TxId = txId;
96+
return static_cast<TSelf&>(*this);
97+
}
98+
99+
TSelf& WithBody(const TString& body) {
100+
Record.Body = body;
101+
return static_cast<TSelf&>(*this);
102+
}
103+
104+
TSelf& WithBody(TString&& body) {
105+
Record.Body = std::move(body);
106+
return static_cast<TSelf&>(*this);
107+
}
108+
109+
TSelf& WithSource(ESource source) {
110+
Record.Source = source;
111+
return static_cast<TSelf&>(*this);
112+
}
113+
114+
T&& Build() {
115+
return std::move(Record);
116+
}
117+
118+
protected:
119+
T Record;
120+
121+
}; // TChangeRecordBuilder
122+
123+
}
124+
125+
Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TChangeRecord, out, value) {
126+
return value.Out(out);
127+
}

ydb/core/change_exchange/ya.make

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
change_record.cpp
5+
)
6+
7+
GENERATE_ENUM_SERIALIZATION(change_record.h)
8+
9+
PEERDIR(
10+
ydb/core/protos
11+
ydb/core/scheme
12+
)
13+
14+
YQL_LAST_ABI_VERSION()
15+
16+
END()

ydb/core/tx/datashard/change_record.cpp

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,69 +5,9 @@
55
namespace NKikimr::NDataShard {
66

77
void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const {
8-
record.SetOrder(Order);
9-
record.SetGroup(Group);
10-
record.SetStep(Step);
11-
record.SetTxId(TxId);
8+
NChangeExchange::TChangeRecord::Serialize(record);
129
record.SetPathOwnerId(PathId.OwnerId);
1310
record.SetLocalPathId(PathId.LocalPathId);
14-
15-
switch (Kind) {
16-
case EKind::AsyncIndex: {
17-
Y_ABORT_UNLESS(record.MutableAsyncIndex()->ParseFromArray(Body.data(), Body.size()));
18-
break;
19-
}
20-
case EKind::CdcDataChange: {
21-
Y_ABORT_UNLESS(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size()));
22-
break;
23-
}
24-
case EKind::CdcHeartbeat: {
25-
break;
26-
}
27-
}
28-
}
29-
30-
static auto ParseBody(const TString& protoBody) {
31-
NKikimrChangeExchange::TDataChange body;
32-
Y_ABORT_UNLESS(body.ParseFromArray(protoBody.data(), protoBody.size()));
33-
return body;
34-
}
35-
36-
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
37-
if (Key) {
38-
return *Key;
39-
}
40-
41-
switch (Kind) {
42-
case EKind::AsyncIndex:
43-
case EKind::CdcDataChange: {
44-
const auto parsed = ParseBody(Body);
45-
46-
TSerializedCellVec key;
47-
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
48-
49-
Key.ConstructInPlace(key.GetCells());
50-
break;
51-
}
52-
53-
case EKind::CdcHeartbeat: {
54-
Y_ABORT("Not supported");
55-
}
56-
}
57-
58-
Y_ABORT_UNLESS(Key);
59-
return *Key;
60-
}
61-
62-
i64 TChangeRecord::GetSeqNo() const {
63-
Y_ABORT_UNLESS(Order <= Max<i64>());
64-
return static_cast<i64>(Order);
65-
}
66-
67-
TInstant TChangeRecord::GetApproximateCreationDateTime() const {
68-
return GetGroup()
69-
? TInstant::MicroSeconds(GetGroup())
70-
: TInstant::MilliSeconds(GetStep());
7111
}
7212

7313
bool TChangeRecord::IsBroadcast() const {
@@ -79,13 +19,6 @@ bool TChangeRecord::IsBroadcast() const {
7919
}
8020
}
8121

82-
TString TChangeRecord::ToString() const {
83-
TString result;
84-
TStringOutput out(result);
85-
Out(out);
86-
return result;
87-
}
88-
8922
void TChangeRecord::Out(IOutputStream& out) const {
9023
out << "{"
9124
<< " Order: " << Order

0 commit comments

Comments
 (0)