Skip to content

Commit 9604678

Browse files
authored
Merge 091940c into fddc556
2 parents fddc556 + 091940c commit 9604678

19 files changed

+268
-253
lines changed

ydb/core/persqueue/pqrb/mirror_describer.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <google/protobuf/util/message_differencer.h>
66

7+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("topic", TopicName)
8+
79
using namespace NPersQueue;
810

911
namespace NKikimr {
@@ -80,7 +82,7 @@ void TMirrorDescriber::DescribeTopic(const TActorContext& ctx) {
8082
}
8183

8284
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
83-
Y_ABORT_UNLESS(factory);
85+
PQ_ENSURE(factory);
8486
auto future = factory->GetTopicDescription(Config, CredentialsProvider);
8587
future.Subscribe(
8688
[
@@ -114,7 +116,7 @@ void TMirrorDescriber::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*
114116
CredentialsProvider = nullptr;
115117

116118
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
117-
Y_ABORT_UNLESS(factory);
119+
PQ_ENSURE(factory);
118120
auto future = factory->GetCredentialsProvider(Config.GetCredentials());
119121
future.Subscribe(
120122
[
@@ -174,6 +176,24 @@ TString TMirrorDescriber::GetCurrentState() const {
174176
return "UNKNOWN";
175177
}
176178

179+
bool TMirrorDescriber::OnUnhandledException(const std::exception& exc) {
180+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::PQ_MIRROR_DESCRIBER,
181+
LogDescription() << "unhandled exception " << TypeName(exc) << ": " << exc.what() << Endl
182+
<< TBackTrace::FromCurrentException().PrintToString());
183+
184+
Send(ReadBalancerActorId, new TEvents::TEvPoison());
185+
PassAway();
186+
return true;
187+
}
188+
189+
NActors::IActor* CreateMirrorDescriber(
190+
const NActors::TActorId& readBalancerActorId,
191+
const TString& topicName,
192+
const NKikimrPQ::TMirrorPartitionConfig& config
193+
) {
194+
return new TMirrorDescriber(readBalancerActorId, topicName, config);
195+
}
196+
177197

178198
}// NPQ
179199
}// NKikimr

ydb/core/persqueue/pqrb/mirror_describer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
namespace NKikimr::NPQ {
1515

1616

17-
class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber> {
17+
class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber>
18+
, public IActorExceptionHandler {
1819
private:
1920
static constexpr TDuration INIT_INTERVAL_MAX = TDuration::Seconds(240);
2021
static constexpr TDuration INIT_INTERVAL_START = TDuration::Seconds(1);
@@ -76,7 +77,7 @@ class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber> {
7677
void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx);
7778
void HandleWakeup(const TActorContext& ctx);
7879
void HandleDescriptionResult(TEvPQ::TEvMirrorTopicDescription::TPtr& ev, const TActorContext& ctx);
79-
80+
bool OnUnhandledException(const std::exception&) override;
8081
private:
8182
TActorId ReadBalancerActorId;
8283
TString TopicName;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <ydb/core/persqueue/public/config.h>
4+
#include <ydb/library/actors/core/actorsystem_fwd.h>
5+
6+
namespace NKikimr::NPQ {
7+
8+
NActors::IActor* CreateMirrorDescriber(
9+
const NActors::TActorId& readBalancerActorId,
10+
const TString& topicName,
11+
const NKikimrPQ::TMirrorPartitionConfig& config
12+
);
13+
14+
}

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "read_balancer__txpreinit.h"
44
#include "read_balancer__txwrite.h"
55
#include "read_balancer_log.h"
6-
#include "mirror_describer.h"
6+
#include "mirror_describer_factory.h"
77

88
#include <ydb/core/persqueue/events/internal.h>
99
#include <ydb/core/protos/counters_pq.pb.h>
@@ -13,6 +13,8 @@
1313
#include <library/cpp/string_utils/base64/base64.h>
1414
#include <library/cpp/random_provider/random_provider.h>
1515

16+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("tablet_id", TabletID())("path", Path)("topic", Topic)
17+
1618
namespace NKikimr {
1719
namespace NPQ {
1820

@@ -289,7 +291,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
289291
if (MirrorTopicDescriberActorId) {
290292
ctx.Send(MirrorTopicDescriberActorId, new TEvPQ::TEvChangePartitionConfig(nullptr, TabletConfig));
291293
} else {
292-
MirrorTopicDescriberActorId = ctx.Register(new TMirrorDescriber(SelfId(), Topic, TabletConfig.GetPartitionConfig().GetMirrorFrom()));
294+
MirrorTopicDescriberActorId = ctx.Register(CreateMirrorDescriber(SelfId(), Topic, TabletConfig.GetPartitionConfig().GetMirrorFrom()));
293295
}
294296
} else {
295297
if (MirrorTopicDescriberActorId) {
@@ -320,7 +322,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
320322
for (auto& p : record.GetPartitions()) {
321323
auto it = PartitionsInfo.find(p.GetPartition());
322324
if (it == PartitionsInfo.end()) {
323-
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
325+
PQ_ENSURE(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
324326

325327
partitionsInfo[p.GetPartition()] = {p.GetTabletId()};
326328

@@ -422,7 +424,7 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor
422424
pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
423425
TabletPipes[tabletId].PipeActor = pipeClient;
424426
auto res = PipesRequested.insert(tabletId);
425-
Y_ABORT_UNLESS(res.second);
427+
PQ_ENSURE(res.second);
426428
} else {
427429
pipeClient = it->second.PipeActor;
428430
}

ydb/core/persqueue/pqrb/read_balancer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class TMetricsTimeKeeper {
5454
};
5555

5656

57-
class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTabletExecutedFlat {
57+
class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>,
58+
public TTabletExecutedFlat {
5859
struct TTxPreInit;
5960
struct TTxInit;
6061
struct TTxWrite;

ydb/core/persqueue/pqrb/read_balancer__txinit.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
3333
return false;
3434

3535
while (!dataRowset.EndOfSet()) { //found out topic info
36-
Y_ABORT_UNLESS(!Self->Inited);
36+
AFL_ENSURE(!Self->Inited)("tablet_id", Self->TabletID());
3737
Self->PathId = dataRowset.GetValue<Schema::Data::PathId>();
3838
Self->Topic = dataRowset.GetValue<Schema::Data::Topic>();
3939
Self->Path = dataRowset.GetValue<Schema::Data::Path>();
@@ -50,7 +50,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
5050
TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>("");
5151
if (!config.empty()) {
5252
bool res = Self->TabletConfig.ParseFromString(config);
53-
Y_ABORT_UNLESS(res);
53+
AFL_ENSURE(res)("tablet_id", Self->TabletID())("path", Self->Path)("topic", Self->Topic);
5454

5555
Migrate(Self->TabletConfig);
5656
Self->Consumers.clear();

ydb/core/persqueue/pqrb/read_balancer__txwrite.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ struct TPersQueueReadBalancer::TTxWrite : public ITransaction {
4848
NIceDb::TNiceDb db(txc.DB);
4949
TString config;
5050
bool res = Self->TabletConfig.SerializeToString(&config);
51-
Y_ABORT_UNLESS(res);
51+
AFL_ENSURE(res)("tablet_id", Self->TabletID())("path", Self->Path)("topic", Self->Topic);
5252
db.Table<Schema::Data>().Key(1).Update(
5353
NIceDb::TUpdate<Schema::Data::PathId>(Self->PathId),
5454
NIceDb::TUpdate<Schema::Data::Topic>(Self->Topic),

ydb/core/persqueue/pqtablet/partition/mirrorer.cpp

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
#include <util/string/join.h>
1717

18+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("tablet_id", TabletId)("partition_id", Partition)
19+
1820
using namespace NPersQueue;
1921

2022
namespace NKikimr {
@@ -27,6 +29,7 @@ constexpr NKikimrServices::TActivity::EType TMirrorer::ActorActivityType() {
2729
}
2830

2931
TMirrorer::TMirrorer(
32+
ui64 tabletId,
3033
TActorId tabletActor,
3134
TActorId partitionActor,
3235
const NPersQueue::TTopicConverterPtr& topicConverter,
@@ -36,7 +39,8 @@ TMirrorer::TMirrorer(
3639
const NKikimrPQ::TMirrorPartitionConfig& config,
3740
const TTabletCountersBase& counters
3841
)
39-
: TabletActor(tabletActor)
42+
: TabletId(tabletId)
43+
, TabletActor(tabletActor)
4044
, PartitionActor(partitionActor)
4145
, TopicConverter(topicConverter)
4246
, Partition(partition)
@@ -160,8 +164,8 @@ void TMirrorer::ProcessError(const TActorContext& ctx, const TString& msg, const
160164
}
161165

162166
void TMirrorer::AfterSuccesWrite(const TActorContext& ctx) {
163-
Y_ABORT_UNLESS(WriteInFlight.empty());
164-
Y_ABORT_UNLESS(WriteRequestInFlight);
167+
PQ_ENSURE(WriteInFlight.empty());
168+
PQ_ENSURE(WriteRequestInFlight);
165169
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
166170
<< " written " << WriteRequestInFlight.value().CmdWriteSize()
167171
<< " messages with first offset=" << WriteRequestInFlight.value().GetCmdWriteOffset()
@@ -203,7 +207,7 @@ void TMirrorer::ProcessWriteResponse(
203207
MirrorerTimeLags->IncFor(lag.MilliSeconds(), 1);
204208
}
205209
ui64 offset = writtenMessageInfo.GetOffset();
206-
Y_ABORT_UNLESS((ui64)result.GetOffset() == offset);
210+
PQ_ENSURE((ui64)result.GetOffset() == offset);
207211
Y_VERIFY_S(EndOffset <= offset, MirrorerDescription()
208212
<< "end offset more the written " << EndOffset << ">" << offset);
209213
EndOffset = offset + 1;
@@ -417,7 +421,7 @@ void TMirrorer::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*ev*/, c
417421
CredentialsProvider = nullptr;
418422

419423
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
420-
Y_ABORT_UNLESS(factory);
424+
PQ_ENSURE(factory);
421425
auto future = factory->GetCredentialsProvider(Config.GetCredentials());
422426
future.Subscribe(
423427
[
@@ -457,7 +461,7 @@ void TMirrorer::HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev,
457461
}
458462

459463
void TMirrorer::RetryWrite(const TActorContext& ctx) {
460-
Y_ABORT_UNLESS(WriteRequestInFlight);
464+
PQ_ENSURE(WriteRequestInFlight);
461465

462466
THolder<TEvPersQueue::TEvRequest> request = MakeHolder<TEvPersQueue::TEvRequest>();
463467
auto req = request->Record.MutablePartitionRequest();
@@ -485,7 +489,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
485489
OffsetToRead = Queue.front().GetOffset();
486490
while (!Queue.empty()) {
487491
ui64 dataSize = Queue.back().GetData().size();
488-
Y_ABORT_UNLESS(BytesInFlight >= dataSize);
492+
PQ_ENSURE(BytesInFlight >= dataSize);
489493
BytesInFlight -= dataSize;
490494
Queue.pop_back();
491495
}
@@ -497,7 +501,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
497501
PartitionStream.Reset();
498502

499503
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
500-
Y_ABORT_UNLESS(factory);
504+
PQ_ENSURE(factory);
501505

502506
TLog log(MakeHolder<TDeferredActorLogBackend>(
503507
factory->GetSharedActorSystem(),
@@ -554,7 +558,7 @@ void TMirrorer::TryUpdateWriteTimetsamp(const TActorContext &ctx) {
554558
void TMirrorer::AddMessagesToQueue(std::vector<TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage>&& messages) {
555559
for (auto& msg : messages) {
556560
ui64 offset = msg.GetOffset();
557-
Y_ABORT_UNLESS(OffsetToRead <= offset);
561+
PQ_ENSURE(OffsetToRead <= offset);
558562
ui64 messageSize = msg.GetData().size();
559563

560564
Counters.Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Increment(messageSize);
@@ -730,15 +734,26 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
730734
ConsumerInitInterval = CONSUMER_INIT_INTERVAL_START;
731735
}
732736

733-
NActors::IActor* CreateMirrorer(const NActors::TActorId& tabletActor,
737+
bool TMirrorer::OnUnhandledException(const std::exception& exc) {
738+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::PQ_MIRRORER,
739+
MirrorerDescription() << "unhandled exception " << TypeName(exc) << ": " << exc.what() << Endl
740+
<< TBackTrace::FromCurrentException().PrintToString());
741+
742+
Send(TabletActor, new TEvents::TEvPoison());
743+
PassAway();
744+
return true;
745+
}
746+
747+
NActors::IActor* CreateMirrorer(const ui64 tabletId,
748+
const NActors::TActorId& tabletActor,
734749
const NActors::TActorId& partitionActor,
735750
const NPersQueue::TTopicConverterPtr& topicConverter,
736751
const ui32 partition,
737752
const bool localDC,
738753
const ui64 endOffset,
739754
const NKikimrPQ::TMirrorPartitionConfig& config,
740755
const TTabletCountersBase& counters) {
741-
return new TMirrorer(tabletActor, partitionActor, topicConverter, partition, localDC, endOffset, config, counters);
756+
return new TMirrorer(tabletId, tabletActor, partitionActor, topicConverter, partition, localDC, endOffset, config, counters);
742757
}
743758

744759
}// NPQ

ydb/core/persqueue/pqtablet/partition/mirrorer.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
namespace NKikimr {
1919
namespace NPQ {
2020

21-
class TMirrorer : public TActorBootstrapped<TMirrorer> {
21+
class TMirrorer : public TActorBootstrapped<TMirrorer>
22+
, public IActorExceptionHandler {
2223
private:
2324
const ui64 MAX_READ_FUTURES_STORE = 25;
2425
const ui64 MAX_BYTES_IN_FLIGHT = 16_MB;
@@ -121,6 +122,7 @@ class TMirrorer : public TActorBootstrapped<TMirrorer> {
121122
public:
122123
static constexpr NKikimrServices::TActivity::EType ActorActivityType();
123124
TMirrorer(
125+
ui64 tabletId,
124126
TActorId tabletActor,
125127
TActorId partitionActor,
126128
const NPersQueue::TTopicConverterPtr& topicConverter,
@@ -151,12 +153,14 @@ class TMirrorer : public TActorBootstrapped<TMirrorer> {
151153
);
152154
void StartWaitNextReaderEvent(const TActorContext& ctx);
153155

156+
bool OnUnhandledException(const std::exception&) override;
154157
private:
155-
TActorId TabletActor;
156-
TActorId PartitionActor;
157-
NPersQueue::TTopicConverterPtr TopicConverter;
158-
ui32 Partition;
159-
bool IsLocalDC;
158+
const ui64 TabletId;
159+
const TActorId TabletActor;
160+
const TActorId PartitionActor;
161+
const NPersQueue::TTopicConverterPtr TopicConverter;
162+
const ui32 Partition;
163+
const bool IsLocalDC;
160164
ui64 EndOffset;
161165
ui64 OffsetToRead;
162166
NKikimrPQ::TMirrorPartitionConfig Config;

ydb/core/persqueue/pqtablet/partition/mirrorer_factory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ namespace NPQ {
2020

2121
class TPartitionId;
2222

23-
NActors::IActor* CreateMirrorer(const NActors::TActorId& tabletActor,
23+
NActors::IActor* CreateMirrorer(const ui64 tabletId,
24+
const NActors::TActorId& tabletActor,
2425
const NActors::TActorId& partitionActor,
2526
const NPersQueue::TTopicConverterPtr& topicConverter,
2627
const ui32 partition,

0 commit comments

Comments
 (0)