Skip to content

Commit 02cd51c

Browse files
committed
Move TBaseChangeSender to ydb/core/change_exchange KIKIMR-20673
1 parent 59c0776 commit 02cd51c

File tree

10 files changed

+75
-65
lines changed

10 files changed

+75
-65
lines changed

ydb/core/tx/datashard/change_sender_common_ops.cpp renamed to ydb/core/change_exchange/change_sender_common_ops.cpp

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#include "change_sender_common_ops.h"
22
#include "change_sender_monitoring.h"
33

4+
#include <ydb/library/yverify_stream/yverify_stream.h>
5+
46
#include <library/cpp/monlib/service/pages/mon_page.h>
57
#include <library/cpp/monlib/service/pages/templates.h>
68

79
#include <util/generic/algorithm.h>
810
#include <util/generic/size_literals.h>
911

10-
namespace NKikimr::NDataShard {
12+
namespace NKikimr::NChangeExchange {
1113

1214
void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
1315
auto res = senders.emplace(partitionId, TSender{});
@@ -82,7 +84,7 @@ void TBaseChangeSender::KillSenders() {
8284
}
8385
}
8486

85-
void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
87+
void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
8688
for (auto& record : records) {
8789
Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
8890
<< ": expected# " << PathId
@@ -117,11 +119,11 @@ bool TBaseChangeSender::RequestRecords() {
117119
return false;
118120
}
119121

120-
ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records)));
122+
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
121123
return true;
122124
}
123125

124-
void TBaseChangeSender::ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) {
126+
void TBaseChangeSender::ProcessRecords(TVector<IChangeRecord::TPtr>&& records) {
125127
for (auto& record : records) {
126128
auto it = PendingBody.find(record->GetOrder());
127129
if (it == PendingBody.end()) {
@@ -290,7 +292,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
290292
}
291293

292294
Y_ABORT_UNLESS(sender.ActorId);
293-
ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
295+
ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
294296
}
295297

296298
void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
@@ -306,7 +308,7 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
306308
}
307309
}
308310

309-
TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record) {
311+
TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(IChangeRecord::TPtr record) {
310312
Y_ABORT_UNLESS(record->IsBroadcast());
311313

312314
auto it = Broadcasting.find(record->GetOrder());
@@ -430,17 +432,17 @@ void TBaseChangeSender::RemoveRecords() {
430432
}
431433

432434
TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
433-
const TDataShardId& dataShard, const TPathId& pathId)
435+
const TActorId& changeServer, const TPathId& pathId)
434436
: ActorOps(actorOps)
435437
, Resolver(resolver)
436-
, DataShard(dataShard)
438+
, ChangeServer(changeServer)
437439
, PathId(pathId)
438440
, MemLimit(192_KB)
439441
, MemUsage(0)
440442
{
441443
}
442444

443-
void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev,
445+
void TBaseChangeSender::RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev,
444446
const TActorContext& ctx)
445447
{
446448
const auto& cgi = ev->Get()->Cgi();
@@ -468,7 +470,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
468470
TStringStream html;
469471

470472
HTML(html) {
471-
Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId);
473+
Header(html, "Change sender", tabletId);
472474

473475
SimplePanel(html, "Info", [this](IOutputStream& html) {
474476
HTML(html) {
@@ -479,7 +481,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
479481
}
480482
});
481483

482-
SimplePanel(html, "Partition senders", [this](IOutputStream& html) {
484+
SimplePanel(html, "Partition senders", [this, tabletId](IOutputStream& html) {
483485
HTML(html) {
484486
TABLE_CLASS("table table-hover") {
485487
TABLEHEAD() {
@@ -503,7 +505,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
503505
TABLED() { html << sender.Pending.size(); }
504506
TABLED() { html << sender.Prepared.size(); }
505507
TABLED() { html << sender.Broadcasting.size(); }
506-
TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); }
508+
TABLED() { ActorLink(html, tabletId, PathId, partitionId); }
507509
}
508510
}
509511
}

ydb/core/tx/datashard/change_sender_common_ops.h renamed to ydb/core/change_exchange/change_sender_common_ops.h

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
#pragma once
22

33
#include "change_exchange.h"
4-
#include "change_exchange_helpers.h"
54

6-
#include <ydb/core/base/appdata.h>
7-
#include <ydb/core/change_exchange/change_exchange.h>
85
#include <ydb/library/actors/core/actor.h>
96
#include <ydb/library/actors/core/hfunc.h>
107
#include <ydb/library/actors/core/mon.h>
@@ -14,8 +11,7 @@
1411
#include <util/generic/set.h>
1512
#include <util/string/builder.h>
1613

17-
namespace NKikimr {
18-
namespace NDataShard {
14+
namespace NKikimr::NChangeExchange {
1915

2016
struct TEvChangeExchangePrivate {
2117
enum EEv {
@@ -61,8 +57,8 @@ class IChangeSender {
6157
virtual IActor* CreateSender(ui64 partitionId) = 0;
6258
virtual void RemoveRecords() = 0;
6359

64-
virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
65-
virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0;
60+
virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
61+
virtual void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) = 0;
6662
virtual void ForgetRecords(TVector<ui64>&& records) = 0;
6763
virtual void OnReady(ui64 partitionId) = 0;
6864
virtual void OnGone(ui64 partitionId) = 0;
@@ -75,18 +71,18 @@ class IChangeSenderResolver {
7571
virtual void Resolve() = 0;
7672
virtual bool IsResolving() const = 0;
7773
virtual bool IsResolved() const = 0;
78-
virtual ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const = 0;
74+
virtual ui64 GetPartitionId(IChangeRecord::TPtr record) const = 0;
7975
};
8076

8177
class TBaseChangeSender: public IChangeSender {
82-
using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
83-
using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
78+
using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
79+
using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
8480

8581
struct TSender {
8682
TActorId ActorId;
8783
bool Ready = false;
8884
TVector<TEnqueuedRecord> Pending;
89-
TVector<NChangeExchange::IChangeRecord::TPtr> Prepared;
85+
TVector<IChangeRecord::TPtr> Prepared;
9086
TVector<ui64> Broadcasting;
9187
};
9288

@@ -108,7 +104,7 @@ class TBaseChangeSender: public IChangeSender {
108104
void SendPreparedRecords(ui64 partitionId);
109105
void ReEnqueueRecords(const TSender& sender);
110106

111-
TBroadcast& EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record);
107+
TBroadcast& EnsureBroadcast(IChangeRecord::TPtr record);
112108
bool AddBroadcastPartition(ui64 order, ui64 partitionId);
113109
bool RemoveBroadcastPartition(ui64 order, ui64 partitionId);
114110
bool CompleteBroadcastPartition(ui64 order, ui64 partitionId);
@@ -124,35 +120,35 @@ class TBaseChangeSender: public IChangeSender {
124120
remove.push_back(record.Order);
125121
}
126122

127-
ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
123+
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
128124
}
129125

130126
template <>
131127
void RemoveRecords(TVector<ui64>&& records) {
132-
ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records)));
128+
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
133129
}
134130

135131
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
136132
void KillSenders() override;
137133
void RemoveRecords() override;
138134

139-
void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
140-
void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override;
135+
void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
136+
void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) override;
141137
void ForgetRecords(TVector<ui64>&& records) override;
142138
void OnReady(ui64 partitionId) override;
143139
void OnGone(ui64 partitionId) override;
144140

145141
explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
146-
const TDataShardId& dataShard, const TPathId& pathId);
142+
const TActorId& changeServer, const TPathId& pathId);
147143

148-
void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
144+
void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
149145

150146
private:
151147
IActorOps* const ActorOps;
152148
IChangeSenderResolver* const Resolver;
153149

154150
protected:
155-
const TDataShardId DataShard;
151+
const TActorId ChangeServer;
156152
const TPathId PathId;
157153

158154
private:
@@ -162,12 +158,11 @@ class TBaseChangeSender: public IChangeSender {
162158
THashMap<ui64, TSender> Senders; // ui64 is partition id
163159
TSet<TEnqueuedRecord> Enqueued;
164160
TSet<TRequestedRecord> PendingBody;
165-
TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingSent; // ui64 is order
161+
TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order
166162
THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order
167163

168164
TVector<ui64> GonePartitions;
169165

170166
}; // TBaseChangeSender
171167

172-
} // NDataShard
173-
} // NKikimr
168+
}

ydb/core/tx/datashard/change_sender_monitoring.cpp renamed to ydb/core/change_exchange/change_sender_monitoring.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include <util/string/printf.h>
66
#include <util/string/split.h>
77

8-
namespace NKikimr::NDataShard {
8+
namespace NKikimr::NChangeExchange {
99

1010
void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body) {
1111
HTML(str) {

ydb/core/tx/datashard/change_sender_monitoring.h renamed to ydb/core/change_exchange/change_sender_monitoring.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
#include <util/generic/maybe.h>
88

9-
namespace NKikimr::NDataShard {
9+
namespace NKikimr::NChangeExchange {
1010

1111
template <typename T>
1212
static void Link(IOutputStream& str, const TStringBuf path, const T& title) {

ydb/core/change_exchange/ya.make

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@ LIBRARY()
33
SRCS(
44
change_exchange.cpp
55
change_record.cpp
6+
change_sender_common_ops.cpp
7+
change_sender_monitoring.cpp
68
)
79

810
GENERATE_ENUM_SERIALIZATION(change_record.h)
911

1012
PEERDIR(
1113
ydb/core/base
1214
ydb/core/scheme
15+
ydb/library/actors/core
16+
ydb/library/yverify_stream
17+
library/cpp/monlib/service/pages
1318
)
1419

1520
YQL_LAST_ABI_VERSION()

ydb/core/tx/datashard/change_exchange_split.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#include "change_exchange.h"
22
#include "change_exchange_helpers.h"
3-
#include "change_sender_common_ops.h"
43
#include "datashard_impl.h"
54

65
#include <ydb/core/base/tablet_pipe.h>

ydb/core/tx/datashard/change_sender.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#include "change_exchange.h"
22
#include "change_exchange_impl.h"
3-
#include "change_sender_monitoring.h"
43
#include "datashard_impl.h"
54

65
#include <ydb/core/change_exchange/change_exchange.h>
6+
#include <ydb/core/change_exchange/change_sender_monitoring.h>
77
#include <ydb/library/actors/core/actor.h>
88
#include <ydb/library/actors/core/hfunc.h>
99
#include <ydb/library/actors/core/log.h>
@@ -165,6 +165,8 @@ class TChangeSender: public TActor<TChangeSender> {
165165
}
166166

167167
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
168+
using namespace NChangeExchange;
169+
168170
const auto& cgi = ev->Get()->Cgi();
169171
if (const auto& str = cgi.Get("pathId")) {
170172
if (const auto& pathId = ParsePathId(str)) {

0 commit comments

Comments
 (0)