Skip to content

Commit 9fd60b3

Browse files
authored
Merge 1c87540 into 03d0bea
2 parents 03d0bea + 1c87540 commit 9fd60b3

29 files changed

+552
-524
lines changed

ydb/core/persqueue/pqrb/mirror_describer.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <google/protobuf/util/message_differencer.h>
66

7+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("topic", TopicName)
8+
79
using namespace NPersQueue;
810

911
namespace NKikimr {
@@ -80,7 +82,7 @@ void TMirrorDescriber::DescribeTopic(const TActorContext& ctx) {
8082
}
8183

8284
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
83-
Y_ABORT_UNLESS(factory);
85+
PQ_ENSURE(factory);
8486
auto future = factory->GetTopicDescription(Config, CredentialsProvider);
8587
future.Subscribe(
8688
[
@@ -114,7 +116,7 @@ void TMirrorDescriber::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*
114116
CredentialsProvider = nullptr;
115117

116118
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
117-
Y_ABORT_UNLESS(factory);
119+
PQ_ENSURE(factory);
118120
auto future = factory->GetCredentialsProvider(Config.GetCredentials());
119121
future.Subscribe(
120122
[
@@ -174,6 +176,24 @@ TString TMirrorDescriber::GetCurrentState() const {
174176
return "UNKNOWN";
175177
}
176178

179+
bool TMirrorDescriber::OnUnhandledException(const std::exception& exc) {
180+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::PQ_MIRROR_DESCRIBER,
181+
LogDescription() << "unhandled exception " << TypeName(exc) << ": " << exc.what() << Endl
182+
<< TBackTrace::FromCurrentException().PrintToString());
183+
184+
Send(ReadBalancerActorId, new TEvents::TEvPoison());
185+
PassAway();
186+
return true;
187+
}
188+
189+
NActors::IActor* CreateMirrorDescriber(
190+
const NActors::TActorId& readBalancerActorId,
191+
const TString& topicName,
192+
const NKikimrPQ::TMirrorPartitionConfig& config
193+
) {
194+
return new TMirrorDescriber(readBalancerActorId, topicName, config);
195+
}
196+
177197

178198
}// NPQ
179199
}// NKikimr

ydb/core/persqueue/pqrb/mirror_describer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
namespace NKikimr::NPQ {
1515

1616

17-
class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber> {
17+
class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber>
18+
, public IActorExceptionHandler {
1819
private:
1920
static constexpr TDuration INIT_INTERVAL_MAX = TDuration::Seconds(240);
2021
static constexpr TDuration INIT_INTERVAL_START = TDuration::Seconds(1);
@@ -76,7 +77,7 @@ class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber> {
7677
void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx);
7778
void HandleWakeup(const TActorContext& ctx);
7879
void HandleDescriptionResult(TEvPQ::TEvMirrorTopicDescription::TPtr& ev, const TActorContext& ctx);
79-
80+
bool OnUnhandledException(const std::exception&) override;
8081
private:
8182
TActorId ReadBalancerActorId;
8283
TString TopicName;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <ydb/core/persqueue/public/config.h>
4+
#include <ydb/library/actors/core/actorsystem_fwd.h>
5+
6+
namespace NKikimr::NPQ {
7+
8+
NActors::IActor* CreateMirrorDescriber(
9+
const NActors::TActorId& readBalancerActorId,
10+
const TString& topicName,
11+
const NKikimrPQ::TMirrorPartitionConfig& config
12+
);
13+
14+
}

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "read_balancer__txpreinit.h"
44
#include "read_balancer__txwrite.h"
55
#include "read_balancer_log.h"
6-
#include "mirror_describer.h"
6+
#include "mirror_describer_factory.h"
77

88
#include <ydb/core/persqueue/events/internal.h>
99
#include <ydb/core/protos/counters_pq.pb.h>
@@ -13,6 +13,8 @@
1313
#include <library/cpp/string_utils/base64/base64.h>
1414
#include <library/cpp/random_provider/random_provider.h>
1515

16+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("tablet_id", TabletID())("path", Path)("topic", Topic)
17+
1618
namespace NKikimr {
1719
namespace NPQ {
1820

@@ -289,7 +291,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
289291
if (MirrorTopicDescriberActorId) {
290292
ctx.Send(MirrorTopicDescriberActorId, new TEvPQ::TEvChangePartitionConfig(nullptr, TabletConfig));
291293
} else {
292-
MirrorTopicDescriberActorId = ctx.Register(new TMirrorDescriber(SelfId(), Topic, TabletConfig.GetPartitionConfig().GetMirrorFrom()));
294+
MirrorTopicDescriberActorId = ctx.Register(CreateMirrorDescriber(SelfId(), Topic, TabletConfig.GetPartitionConfig().GetMirrorFrom()));
293295
}
294296
} else {
295297
if (MirrorTopicDescriberActorId) {
@@ -320,7 +322,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
320322
for (auto& p : record.GetPartitions()) {
321323
auto it = PartitionsInfo.find(p.GetPartition());
322324
if (it == PartitionsInfo.end()) {
323-
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
325+
PQ_ENSURE(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
324326

325327
partitionsInfo[p.GetPartition()] = {p.GetTabletId()};
326328

@@ -422,7 +424,7 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor
422424
pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
423425
TabletPipes[tabletId].PipeActor = pipeClient;
424426
auto res = PipesRequested.insert(tabletId);
425-
Y_ABORT_UNLESS(res.second);
427+
PQ_ENSURE(res.second);
426428
} else {
427429
pipeClient = it->second.PipeActor;
428430
}

ydb/core/persqueue/pqrb/read_balancer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class TMetricsTimeKeeper {
5454
};
5555

5656

57-
class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTabletExecutedFlat {
57+
class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>,
58+
public TTabletExecutedFlat {
5859
struct TTxPreInit;
5960
struct TTxInit;
6061
struct TTxWrite;

ydb/core/persqueue/pqrb/read_balancer__txinit.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
3333
return false;
3434

3535
while (!dataRowset.EndOfSet()) { //found out topic info
36-
Y_ABORT_UNLESS(!Self->Inited);
36+
AFL_ENSURE(!Self->Inited)("tablet_id", Self->TabletID());
3737
Self->PathId = dataRowset.GetValue<Schema::Data::PathId>();
3838
Self->Topic = dataRowset.GetValue<Schema::Data::Topic>();
3939
Self->Path = dataRowset.GetValue<Schema::Data::Path>();
@@ -50,7 +50,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
5050
TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>("");
5151
if (!config.empty()) {
5252
bool res = Self->TabletConfig.ParseFromString(config);
53-
Y_ABORT_UNLESS(res);
53+
AFL_ENSURE(res)("tablet_id", Self->TabletID())("path", Self->Path)("topic", Self->Topic);
5454

5555
Migrate(Self->TabletConfig);
5656
Self->Consumers.clear();

ydb/core/persqueue/pqrb/read_balancer__txwrite.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ struct TPersQueueReadBalancer::TTxWrite : public ITransaction {
4848
NIceDb::TNiceDb db(txc.DB);
4949
TString config;
5050
bool res = Self->TabletConfig.SerializeToString(&config);
51-
Y_ABORT_UNLESS(res);
51+
AFL_ENSURE(res)("tablet_id", Self->TabletID())("path", Self->Path)("topic", Self->Topic);
5252
db.Table<Schema::Data>().Key(1).Update(
5353
NIceDb::TUpdate<Schema::Data::PathId>(Self->PathId),
5454
NIceDb::TUpdate<Schema::Data::Topic>(Self->Topic),

ydb/core/persqueue/pqtablet/cache/cache_eviction.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "pq_l2_service.h"
55

66
#include <ydb/core/base/appdata.h>
7+
#include <ydb/core/keyvalue/keyvalue_events.h>
78
#include <ydb/core/persqueue/events/internal.h>
89
#include <ydb/core/persqueue/pqtablet/blob/blob.h>
910

ydb/core/persqueue/pqtablet/cache/read.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma once
22

3-
#include <ydb/core/persqueue/pqtablet/partition/partition.h>
43
#include "pq_l2_service.h"
54
#include "cache_eviction.h"
65

ydb/core/persqueue/pqtablet/partition/account_read_quoter.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include <util/string/join.h>
1313
#include <util/string/vector.h>
1414

15+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("tablet_id", TabletId)("partition_id", Partition)
16+
1517
namespace NKikimr {
1618
namespace NPQ {
1719

@@ -96,7 +98,7 @@ void TBasicAccountQuoter::HandleQuotaConsumed(NAccountQuoterEvents::TEvConsumed:
9698
<< ", consumed in credit " << ConsumedBytesInCredit << "/" << CreditBytes
9799
);
98100
auto it = InProcessQuotaRequestCookies.find(ev->Get()->RequestCookie);
99-
Y_ABORT_UNLESS(it != InProcessQuotaRequestCookies.end());
101+
PQ_ENSURE(it != InProcessQuotaRequestCookies.end());
100102
InProcessQuotaRequestCookies.erase(it);
101103

102104
if (!QuotaRequestInFlight) {
@@ -125,14 +127,14 @@ void TBasicAccountQuoter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, cons
125127
LimiterDescription() << "Got quota from Kesus:" << ev->Get()->Result << ". Cookie: " << cookie
126128
);
127129

128-
Y_ABORT_UNLESS(CurrentQuotaRequestCookie == cookie);
130+
PQ_ENSURE(CurrentQuotaRequestCookie == cookie);
129131
if (!Queue.empty()) {
130132
ApproveQuota(Queue.front().Request, Queue.front().StartWait, ctx);
131133
Queue.pop_front();
132134
}
133135

134136
if (Y_UNLIKELY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Success)) {
135-
Y_ABORT_UNLESS(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Deadline); // We set deadline == inf in quota request.
137+
PQ_ENSURE(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Deadline); // We set deadline == inf in quota request.
136138
if (ctx.Now() - LastReportedErrorTime > TDuration::Minutes(1)) {
137139
LOG_ERROR_S(ctx, NKikimrServices::PQ_RATE_LIMITER, LimiterDescription() << "Got quota request error: " << ev->Get()->Result);
138140
LastReportedErrorTime = ctx.Now();
@@ -240,7 +242,7 @@ TQuoterParams TAccountWriteQuoter::CreateQuoterParams(
240242
) {
241243
TQuoterParams params;
242244
const auto& quotingConfig = pqConfig.GetQuotingConfig();
243-
Y_ABORT_UNLESS(quotingConfig.GetTopicWriteQuotaEntityToLimit() != NKikimrPQ::TPQConfig::TQuotingConfig::UNSPECIFIED);
245+
AFL_ENSURE(quotingConfig.GetTopicWriteQuotaEntityToLimit() != NKikimrPQ::TPQConfig::TQuotingConfig::UNSPECIFIED);
244246
auto topicPath = topicConverter->GetFederationPath();
245247

246248
auto topicParts = SplitPath(topicPath); // account/folder/topic // account is first element
@@ -282,5 +284,15 @@ constexpr NKikimrServices::TActivity::EType TAccountWriteQuoter::ActorActivityTy
282284
return NKikimrServices::TActivity::PERSQUEUE_ACCOUNT_WRITE_QUOTER;
283285
}
284286

287+
bool TBasicAccountQuoter::OnUnhandledException(const std::exception& exc) {
288+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::PERSQUEUE,
289+
LimiterDescription() << " unhandled exception " << TypeName(exc) << ": " << exc.what() << Endl
290+
<< TBackTrace::FromCurrentException().PrintToString());
291+
292+
Send(TabletActor, new TEvents::TEvPoison());
293+
PassAway();
294+
return true;
295+
}
296+
285297
}// NPQ
286298
}// NKikimr

0 commit comments

Comments
 (0)