Skip to content

Commit ba66d9c

Browse files
committed
Merge branch 'cp' into KIKIMR-20306/change-record
2 parents bb2a2c6 + 3165ff3 commit ba66d9c

File tree

2 files changed

+280
-0
lines changed

2 files changed

+280
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#include "change_record.h"
2+
3+
#include <ydb/core/protos/change_exchange.pb.h>
4+
5+
namespace NKikimr::NDataShard {
6+
7+
void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const {
8+
record.SetOrder(Order);
9+
record.SetGroup(Group);
10+
record.SetStep(Step);
11+
record.SetTxId(TxId);
12+
record.SetPathOwnerId(PathId.OwnerId);
13+
record.SetLocalPathId(PathId.LocalPathId);
14+
15+
switch (Kind) {
16+
case EKind::AsyncIndex: {
17+
Y_ABORT_UNLESS(record.MutableAsyncIndex()->ParseFromArray(Body.data(), Body.size()));
18+
break;
19+
}
20+
case EKind::CdcDataChange: {
21+
Y_ABORT_UNLESS(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size()));
22+
break;
23+
}
24+
case EKind::CdcHeartbeat: {
25+
break;
26+
}
27+
}
28+
}
29+
30+
static auto ParseBody(const TString& protoBody) {
31+
NKikimrChangeExchange::TDataChange body;
32+
Y_ABORT_UNLESS(body.ParseFromArray(protoBody.data(), protoBody.size()));
33+
return body;
34+
}
35+
36+
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
37+
if (Key) {
38+
return *Key;
39+
}
40+
41+
switch (Kind) {
42+
case EKind::AsyncIndex:
43+
case EKind::CdcDataChange: {
44+
const auto parsed = ParseBody(Body);
45+
46+
TSerializedCellVec key;
47+
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
48+
49+
Key.ConstructInPlace(key.GetCells());
50+
break;
51+
}
52+
53+
case EKind::CdcHeartbeat: {
54+
Y_ABORT("Not supported");
55+
}
56+
}
57+
58+
Y_ABORT_UNLESS(Key);
59+
return *Key;
60+
}
61+
62+
i64 TChangeRecord::GetSeqNo() const {
63+
Y_ABORT_UNLESS(Order <= Max<i64>());
64+
return static_cast<i64>(Order);
65+
}
66+
67+
TInstant TChangeRecord::GetApproximateCreationDateTime() const {
68+
return GetGroup()
69+
? TInstant::MicroSeconds(GetGroup())
70+
: TInstant::MilliSeconds(GetStep());
71+
}
72+
73+
bool TChangeRecord::IsBroadcast() const {
74+
switch (Kind) {
75+
case EKind::CdcHeartbeat:
76+
return true;
77+
default:
78+
return false;
79+
}
80+
}
81+
82+
TString TChangeRecord::ToString() const {
83+
TString result;
84+
TStringOutput out(result);
85+
Out(out);
86+
return result;
87+
}
88+
89+
void TChangeRecord::Out(IOutputStream& out) const {
90+
out << "{"
91+
<< " Order: " << Order
92+
<< " Group: " << Group
93+
<< " Step: " << Step
94+
<< " TxId: " << TxId
95+
<< " PathId: " << PathId
96+
<< " Kind: " << Kind
97+
<< " Source: " << Source
98+
<< " Body: " << Body.size() << "b"
99+
<< " TableId: " << TableId
100+
<< " SchemaVersion: " << SchemaVersion
101+
<< " LockId: " << LockId
102+
<< " LockOffset: " << LockOffset
103+
<< " }";
104+
}
105+
106+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#pragma once
2+
3+
#include "datashard_user_table.h"
4+
5+
#include <ydb/core/scheme/scheme_pathid.h>
6+
#include <ydb/core/scheme/scheme_tablecell.h>
7+
8+
#include <util/generic/maybe.h>
9+
#include <util/generic/string.h>
10+
11+
namespace NKikimrChangeExchange {
12+
class TChangeRecord;
13+
}
14+
15+
namespace NKikimr::NDataShard {
16+
17+
class TChangeRecordBuilder;
18+
19+
class TChangeRecord {
20+
friend class TChangeRecordBuilder;
21+
22+
public:
23+
enum class ESource: ui8 {
24+
Unspecified = 0,
25+
InitialScan = 1,
26+
};
27+
28+
enum class EKind: ui8 {
29+
AsyncIndex,
30+
CdcDataChange,
31+
CdcHeartbeat,
32+
};
33+
34+
public:
35+
ui64 GetOrder() const { return Order; }
36+
ui64 GetGroup() const { return Group; }
37+
ui64 GetStep() const { return Step; }
38+
ui64 GetTxId() const { return TxId; }
39+
ui64 GetLockId() const { return LockId; }
40+
ui64 GetLockOffset() const { return LockOffset; }
41+
const TPathId& GetPathId() const { return PathId; }
42+
EKind GetKind() const { return Kind; }
43+
const TString& GetBody() const { return Body; }
44+
ESource GetSource() const { return Source; }
45+
46+
const TPathId& GetTableId() const { return TableId; }
47+
ui64 GetSchemaVersion() const { return SchemaVersion; }
48+
TUserTable::TCPtr GetSchema() const { return Schema; }
49+
50+
void Serialize(NKikimrChangeExchange::TChangeRecord& record) const;
51+
52+
TConstArrayRef<TCell> GetKey() const;
53+
i64 GetSeqNo() const;
54+
TString GetPartitionKey() const;
55+
TInstant GetApproximateCreationDateTime() const;
56+
bool IsBroadcast() const;
57+
58+
TString ToString() const;
59+
void Out(IOutputStream& out) const;
60+
61+
private:
62+
ui64 Order = Max<ui64>();
63+
ui64 Group = 0;
64+
ui64 Step = 0;
65+
ui64 TxId = 0;
66+
ui64 LockId = 0;
67+
ui64 LockOffset = 0;
68+
TPathId PathId;
69+
EKind Kind;
70+
TString Body;
71+
ESource Source = ESource::Unspecified;
72+
73+
ui64 SchemaVersion;
74+
TPathId TableId;
75+
TUserTable::TCPtr Schema;
76+
77+
mutable TMaybe<TOwnedCellVec> Key;
78+
mutable TMaybe<TString> PartitionKey;
79+
80+
}; // TChangeRecord
81+
82+
class TChangeRecordBuilder {
83+
using EKind = TChangeRecord::EKind;
84+
using ESource = TChangeRecord::ESource;
85+
86+
public:
87+
explicit TChangeRecordBuilder(EKind kind) {
88+
Record.Kind = kind;
89+
}
90+
91+
explicit TChangeRecordBuilder(TChangeRecord&& record)
92+
: Record(std::move(record))
93+
{
94+
}
95+
96+
TChangeRecordBuilder& WithLockId(ui64 lockId) {
97+
Record.LockId = lockId;
98+
return *this;
99+
}
100+
101+
TChangeRecordBuilder& WithLockOffset(ui64 lockOffset) {
102+
Record.LockOffset = lockOffset;
103+
return *this;
104+
}
105+
106+
TChangeRecordBuilder& WithOrder(ui64 order) {
107+
Record.Order = order;
108+
return *this;
109+
}
110+
111+
TChangeRecordBuilder& WithGroup(ui64 group) {
112+
Record.Group = group;
113+
return *this;
114+
}
115+
116+
TChangeRecordBuilder& WithStep(ui64 step) {
117+
Record.Step = step;
118+
return *this;
119+
}
120+
121+
TChangeRecordBuilder& WithTxId(ui64 txId) {
122+
Record.TxId = txId;
123+
return *this;
124+
}
125+
126+
TChangeRecordBuilder& WithPathId(const TPathId& pathId) {
127+
Record.PathId = pathId;
128+
return *this;
129+
}
130+
131+
TChangeRecordBuilder& WithTableId(const TPathId& tableId) {
132+
Record.TableId = tableId;
133+
return *this;
134+
}
135+
136+
TChangeRecordBuilder& WithSchemaVersion(ui64 version) {
137+
Record.SchemaVersion = version;
138+
return *this;
139+
}
140+
141+
TChangeRecordBuilder& WithSchema(TUserTable::TCPtr schema) {
142+
Record.Schema = schema;
143+
return *this;
144+
}
145+
146+
TChangeRecordBuilder& WithBody(const TString& body) {
147+
Record.Body = body;
148+
return *this;
149+
}
150+
151+
TChangeRecordBuilder& WithBody(TString&& body) {
152+
Record.Body = std::move(body);
153+
return *this;
154+
}
155+
156+
TChangeRecordBuilder& WithSource(ESource source) {
157+
Record.Source = source;
158+
return *this;
159+
}
160+
161+
TChangeRecord&& Build() {
162+
return std::move(Record);
163+
}
164+
165+
private:
166+
TChangeRecord Record;
167+
168+
}; // TChangeRecordBuilder
169+
170+
}
171+
172+
Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TChangeRecord, out, value) {
173+
return value.Out(out);
174+
}

0 commit comments

Comments
 (0)