Skip to content

Commit a46988a

Browse files
committed
(refactoring) Split TEvChangeExchange into two parts: common & DS KIKIMR-20673
1 parent ad7b438 commit a46988a

14 files changed

+289
-258
lines changed

ydb/core/base/events.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents {
135135
ES_HEALTH_CHECK,
136136
ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212
137137
ES_YQ, // 4213
138-
ES_CHANGE_EXCHANGE,
138+
ES_CHANGE_EXCHANGE_DATASHARD,
139139
ES_DATABASE_SERVICE, //4215
140140
ES_SEQUENCESHARD, // 4216
141141
ES_SEQUENCEPROXY, // 4217
@@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents {
172172
ES_PQ_PARTITION_CHOOSER,
173173
ES_GRAPH,
174174
ES_REPLICATION_SERVICE,
175+
ES_CHANGE_EXCHANGE,
175176
};
176177
};
177178

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "change_exchange.h"
2+
3+
#include <util/string/builder.h>
4+
#include <util/string/join.h>
5+
6+
namespace NKikimr::NChangeExchange {
7+
8+
/// TEvEnqueueRecords
9+
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
10+
: Records(records)
11+
{
12+
}
13+
14+
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
15+
: Records(std::move(records))
16+
{
17+
}
18+
19+
TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
20+
return TStringBuilder() << ToStringHeader() << " {"
21+
<< " Records [" << JoinSeq(",", Records) << "]"
22+
<< " }";
23+
}
24+
25+
TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
26+
: Order(order)
27+
, PathId(pathId)
28+
, BodySize(bodySize)
29+
{
30+
}
31+
32+
void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
33+
out << "{"
34+
<< " Order: " << Order
35+
<< " PathId: " << PathId
36+
<< " BodySize: " << BodySize
37+
<< " }";
38+
}
39+
40+
/// TEvRequestRecords
41+
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
42+
: Records(records)
43+
{
44+
}
45+
46+
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
47+
: Records(std::move(records))
48+
{
49+
}
50+
51+
TString TEvChangeExchange::TEvRequestRecords::ToString() const {
52+
return TStringBuilder() << ToStringHeader() << " {"
53+
<< " Records [" << JoinSeq(",", Records) << "]"
54+
<< " }";
55+
}
56+
57+
TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
58+
: Order(order)
59+
, BodySize(bodySize)
60+
{
61+
}
62+
63+
bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
64+
return Order < rhs.Order;
65+
}
66+
67+
void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
68+
out << "{"
69+
<< " Order: " << Order
70+
<< " BodySize: " << BodySize
71+
<< " }";
72+
}
73+
74+
/// TEvRemoveRecords
75+
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
76+
: Records(records)
77+
{
78+
}
79+
80+
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
81+
: Records(std::move(records))
82+
{
83+
}
84+
85+
TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
86+
return TStringBuilder() << ToStringHeader() << " {"
87+
<< " Records [" << JoinSeq(",", Records) << "]"
88+
<< " }";
89+
}
90+
91+
/// TEvRecords
92+
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records)
93+
: Records(records)
94+
{
95+
}
96+
97+
TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records)
98+
: Records(std::move(records))
99+
{
100+
}
101+
102+
TString TEvChangeExchange::TEvRecords::ToString() const {
103+
return TStringBuilder() << ToStringHeader() << " {"
104+
<< " Records [" << JoinSeq(",", Records) << "]"
105+
<< " }";
106+
}
107+
108+
/// TEvForgetRecords
109+
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
110+
: Records(records)
111+
{
112+
}
113+
114+
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
115+
: Records(std::move(records))
116+
{
117+
}
118+
119+
TString TEvChangeExchange::TEvForgetRecords::ToString() const {
120+
return TStringBuilder() << ToStringHeader() << " {"
121+
<< " Records [" << JoinSeq(",", Records) << "]"
122+
<< " }";
123+
}
124+
125+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#pragma once
2+
3+
#include "change_record.h"
4+
5+
#include <ydb/core/base/defs.h>
6+
#include <ydb/core/base/events.h>
7+
#include <ydb/core/scheme/scheme_pathid.h>
8+
9+
#include <util/generic/vector.h>
10+
11+
namespace NKikimr::NChangeExchange {
12+
13+
struct TEvChangeExchange {
14+
enum EEv {
15+
// Enqueue for sending
16+
EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE),
17+
// Request change record(s) by id
18+
EvRequestRecords,
19+
// Change record(s)
20+
EvRecords,
21+
// Remove change record(s) from local database
22+
EvRemoveRecords,
23+
// Already removed records that the sender should forget about
24+
EvForgetRecods,
25+
26+
EvEnd,
27+
};
28+
29+
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE));
30+
31+
struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> {
32+
struct TRecordInfo {
33+
ui64 Order;
34+
TPathId PathId;
35+
ui64 BodySize;
36+
37+
TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize);
38+
39+
void Out(IOutputStream& out) const;
40+
};
41+
42+
TVector<TRecordInfo> Records;
43+
44+
explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records);
45+
explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records);
46+
TString ToString() const override;
47+
};
48+
49+
struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> {
50+
struct TRecordInfo {
51+
ui64 Order;
52+
ui64 BodySize;
53+
54+
TRecordInfo(ui64 order, ui64 bodySize = 0);
55+
56+
bool operator<(const TRecordInfo& rhs) const;
57+
void Out(IOutputStream& out) const;
58+
};
59+
60+
TVector<TRecordInfo> Records;
61+
62+
explicit TEvRequestRecords(const TVector<TRecordInfo>& records);
63+
explicit TEvRequestRecords(TVector<TRecordInfo>&& records);
64+
TString ToString() const override;
65+
};
66+
67+
struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> {
68+
TVector<ui64> Records;
69+
70+
explicit TEvRemoveRecords(const TVector<ui64>& records);
71+
explicit TEvRemoveRecords(TVector<ui64>&& records);
72+
TString ToString() const override;
73+
};
74+
75+
struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
76+
TVector<IChangeRecord::TPtr> Records;
77+
78+
explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records);
79+
explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records);
80+
TString ToString() const override;
81+
};
82+
83+
struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
84+
TVector<ui64> Records;
85+
86+
explicit TEvForgetRecords(const TVector<ui64>& records);
87+
explicit TEvForgetRecords(TVector<ui64>&& records);
88+
TString ToString() const override;
89+
};
90+
91+
}; // TEvChangeExchange
92+
93+
}
94+
95+
Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) {
96+
return x.Out(o);
97+
}
98+
99+
Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) {
100+
return x.Out(o);
101+
}

ydb/core/change_exchange/ya.make

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
LIBRARY()
22

33
SRCS(
4+
change_exchange.cpp
45
change_record.cpp
56
)
67

78
GENERATE_ENUM_SERIALIZATION(change_record.h)
89

10+
PEERDIR(
11+
ydb/core/base
12+
ydb/core/scheme
13+
)
14+
915
YQL_LAST_ABI_VERSION()
1016

1117
END()
Lines changed: 2 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,127 +1,8 @@
11
#include "change_exchange.h"
22

33
#include <util/string/builder.h>
4-
#include <util/string/join.h>
54

6-
namespace NKikimr {
7-
namespace NDataShard {
8-
9-
/// TEvEnqueueRecords
10-
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
11-
: Records(records)
12-
{
13-
}
14-
15-
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
16-
: Records(std::move(records))
17-
{
18-
}
19-
20-
TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
21-
return TStringBuilder() << ToStringHeader() << " {"
22-
<< " Records [" << JoinSeq(",", Records) << "]"
23-
<< " }";
24-
}
25-
26-
TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
27-
: Order(order)
28-
, PathId(pathId)
29-
, BodySize(bodySize)
30-
{
31-
}
32-
33-
void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
34-
out << "{"
35-
<< " Order: " << Order
36-
<< " PathId: " << PathId
37-
<< " BodySize: " << BodySize
38-
<< " }";
39-
}
40-
41-
/// TEvRequestRecords
42-
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
43-
: Records(records)
44-
{
45-
}
46-
47-
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
48-
: Records(std::move(records))
49-
{
50-
}
51-
52-
TString TEvChangeExchange::TEvRequestRecords::ToString() const {
53-
return TStringBuilder() << ToStringHeader() << " {"
54-
<< " Records [" << JoinSeq(",", Records) << "]"
55-
<< " }";
56-
}
57-
58-
TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
59-
: Order(order)
60-
, BodySize(bodySize)
61-
{
62-
}
63-
64-
bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
65-
return Order < rhs.Order;
66-
}
67-
68-
void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
69-
out << "{"
70-
<< " Order: " << Order
71-
<< " BodySize: " << BodySize
72-
<< " }";
73-
}
74-
75-
/// TEvRemoveRecords
76-
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
77-
: Records(records)
78-
{
79-
}
80-
81-
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
82-
: Records(std::move(records))
83-
{
84-
}
85-
86-
TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
87-
return TStringBuilder() << ToStringHeader() << " {"
88-
<< " Records [" << JoinSeq(",", Records) << "]"
89-
<< " }";
90-
}
91-
92-
/// TEvRecords
93-
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records)
94-
: Records(records)
95-
{
96-
}
97-
98-
TEvChangeExchange::TEvRecords::TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records)
99-
: Records(std::move(records))
100-
{
101-
}
102-
103-
TString TEvChangeExchange::TEvRecords::ToString() const {
104-
return TStringBuilder() << ToStringHeader() << " {"
105-
<< " Records [" << JoinSeq(",", Records) << "]"
106-
<< " }";
107-
}
108-
109-
/// TEvForgetRecords
110-
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
111-
: Records(records)
112-
{
113-
}
114-
115-
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
116-
: Records(std::move(records))
117-
{
118-
}
119-
120-
TString TEvChangeExchange::TEvForgetRecords::ToString() const {
121-
return TStringBuilder() << ToStringHeader() << " {"
122-
<< " Records [" << JoinSeq(",", Records) << "]"
123-
<< " }";
124-
}
5+
namespace NKikimr::NDataShard {
1256

1267
/// TEvAddSender
1278
TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId)
@@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const {
15132
<< " }";
15233
}
15334

154-
} // NDataShard
155-
} // NKikimr
35+
}

0 commit comments

Comments
 (0)