Skip to content

Commit 4ab126d

Browse files
authored
24-4 Allow to send duplicates through TDedicatedPipePool (#18148)
2 parents 053400b + df21e80 commit 4ab126d

File tree

3 files changed

+32
-27
lines changed

3 files changed

+32
-27
lines changed

ydb/core/tx/schemeshard/dedicated_pipe_pool.h

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@ class TDedicatedPipePool {
1717
TMap<TActorId, std::pair<TEntityId, TTabletId>> Owners;
1818

1919
public:
20-
void Create(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
21-
Y_ABORT_UNLESS(!Pipes[entityId].contains(dst));
20+
void Send(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
2221
using namespace NTabletPipe;
2322

24-
const auto clientId = ctx.ExecutorThread.RegisterActor(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
25-
.MinRetryTime = TDuration::MilliSeconds(100),
26-
.MaxRetryTime = TDuration::Seconds(30),
27-
}));
23+
if (!Pipes[entityId].contains(dst)) {
24+
const auto clientId = ctx.ExecutorThread.RegisterActor(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
25+
.MinRetryTime = TDuration::MilliSeconds(100),
26+
.MaxRetryTime = TDuration::Seconds(30),
27+
}));
2828

29-
Pipes[entityId][dst] = clientId;
30-
Owners[clientId] = std::make_pair(entityId, dst);
29+
Pipes[entityId][dst] = clientId;
30+
Owners[clientId] = std::make_pair(entityId, dst);
31+
}
3132

33+
const auto clientId = Pipes[entityId][dst];
34+
Y_ABORT_UNLESS(Owners[clientId] == std::make_pair(entityId, dst));
3235
SendData(ctx.SelfID, clientId, message.Release(), 0);
3336
}
3437

@@ -55,10 +58,10 @@ class TDedicatedPipePool {
5558
}
5659
}
5760

58-
ui64 CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
61+
void CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
5962
auto entityIt = Pipes.find(entityId);
6063
if (entityIt == Pipes.end()) {
61-
return 0;
64+
return;
6265
}
6366

6467
const auto& entityPipes = entityIt->second;
@@ -71,8 +74,6 @@ class TDedicatedPipePool {
7174
for (const auto& tabletId : tablets) {
7275
Close(entityId, tabletId, ctx);
7376
}
74-
75-
return tablets.size();
7677
}
7778

7879
void Shutdown(const TActorContext& ctx) {

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
194194
private:
195195
TIndexBuildId BuildId;
196196

197-
TDeque<std::tuple<TTabletId, ui64, THolder<IEventBase>>> ToTabletSend;
197+
TMap<TTabletId, THolder<IEventBase>> ToTabletSend;
198198

199199
public:
200200
explicit TTxProgress(TSelf* self, TIndexBuildId id)
@@ -206,10 +206,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
206206
Y_ABORT_UNLESS(Self->IndexBuilds.contains(BuildId));
207207
TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(BuildId);
208208

209-
LOG_I("TTxBuildProgress: Resume"
210-
<< ": id# " << BuildId);
211-
LOG_D("TTxBuildProgress: Resume"
212-
<< ": " << *buildInfo);
209+
LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo->State);
210+
LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo->State << " " << *buildInfo);
213211

214212
switch (buildInfo->State) {
215213
case TIndexBuildInfo::EState::Invalid:
@@ -263,6 +261,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
263261
buildInfo->DoneShards.clear();
264262
buildInfo->InProgressShards.clear();
265263

264+
ToTabletSend.clear();
266265
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
267266

268267
ChangeState(BuildId, TIndexBuildInfo::EState::Cancellation_Applying);
@@ -283,6 +282,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
283282
&& buildInfo->DoneShards.empty()
284283
&& buildInfo->InProgressShards.empty())
285284
{
285+
ToTabletSend.clear();
286+
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
287+
286288
for (const auto& item: buildInfo->Shards) {
287289
const TIndexBuildInfo::TShardStatus& shardStatus = item.second;
288290
switch (shardStatus.Status) {
@@ -374,16 +376,18 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
374376
ev->Record.SetSeqNoGeneration(Self->Generation());
375377
ev->Record.SetSeqNoRound(++shardStatus.SeqNoRound);
376378

377-
LOG_D("TTxBuildProgress: TEvBuildIndexCreateRequest"
379+
LOG_N("TTxBuildProgress: TEvBuildIndexCreateRequest"
378380
<< ": " << ev->Record.ShortDebugString());
379381

380-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
382+
ToTabletSend.emplace(shardId, std::move(ev));
381383
}
382384

383385
if (buildInfo->InProgressShards.empty() && buildInfo->ToUploadShards.empty()
384-
&& buildInfo->DoneShards.size() == buildInfo->Shards.size()) {
386+
&& buildInfo->DoneShards.size() == buildInfo->Shards.size())
387+
{
385388
// all done
386-
Y_ABORT_UNLESS(0 == Self->IndexBuildPipes.CloseAll(BuildId, ctx));
389+
ToTabletSend.clear();
390+
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
387391

388392
ChangeState(BuildId, TIndexBuildInfo::EState::Applying);
389393
Progress(BuildId);
@@ -507,8 +511,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
507511
}
508512

509513
void DoComplete(const TActorContext& ctx) override {
510-
for (auto& x: ToTabletSend) {
511-
Self->IndexBuildPipes.Create(BuildId, std::get<0>(x), std::move(std::get<2>(x)), ctx);
514+
for (auto& [shardId, ev]: ToTabletSend) {
515+
Self->IndexBuildPipes.Send(BuildId, shardId, std::move(ev), ctx);
512516
}
513517
ToTabletSend.clear();
514518
}
@@ -620,7 +624,7 @@ struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder
620624
const auto& tabletId = PipeRetry.TabletId;
621625
const auto& shardIdx = Self->GetShardIdx(tabletId);
622626

623-
LOG_I("TTxReply : PipeRetry"
627+
LOG_N("TTxReply : PipeRetry"
624628
<< ", buildIndexId# " << buildId
625629
<< ", tabletId# " << tabletId
626630
<< ", shardIdx# " << shardIdx);
@@ -999,8 +1003,8 @@ struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder
9991003
<< ", BuildIndexId: " << buildInfo->Id
10001004
<< ", status: " << Ydb::StatusIds::StatusCode_Name(status)
10011005
<< ", error: " << buildInfo->Issue
1002-
<< ", replyTo: " << buildInfo->CreateSender.ToString());
1003-
LOG_D("Message:\n" << responseEv->Record.ShortDebugString());
1006+
<< ", replyTo: " << buildInfo->CreateSender.ToString()
1007+
<< ", message: " << responseEv->Record.ShortDebugString());
10041008

10051009
Send(buildInfo->CreateSender, std::move(responseEv), 0, buildInfo->SenderCookie);
10061010
}

ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
114114

115115
void Complete(const TActorContext& ctx) override {
116116
for (auto& [streamPathId, tabletId, ev] : ScanRequests) {
117-
Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx);
117+
Self->CdcStreamScanPipes.Send(streamPathId, tabletId, std::move(ev), ctx);
118118
}
119119

120120
if (StreamToProgress) {

0 commit comments

Comments
 (0)