Skip to content

Commit df51ab2

Browse files
Merge 7962922 into 9480de8
2 parents 9480de8 + 7962922 commit df51ab2

File tree

5 files changed

+14
-9
lines changed

5 files changed

+14
-9
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4621,7 +4621,6 @@ TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) {
46214621
partition,
46224622
SelfId(),
46234623
TabletID(),
4624-
IsLocalDC,
46254624
*Counters
46264625
));
46274626
}

ydb/core/persqueue/read_quoter.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ void TPartitionQuoterBase::StartQuoting(TRequestContext&& context) {
3535
auto& request = context.Request;
3636
auto* accountQuotaTracker = GetAccountQuotaTracker(request);
3737
if (accountQuotaTracker) {
38-
3938
Send(accountQuotaTracker->Actor, new NAccountQuoterEvents::TEvRequest(request->Cookie, request->Request.Release()));
4039
PendingAccountQuotaRequests[request->Cookie] = std::move(context);
4140
} else {
@@ -115,6 +114,7 @@ void TPartitionQuoterBase::ProcessInflightQueue() {
115114

116115
void TPartitionQuoterBase::HandleConfigUpdate(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
117116
PQTabletConfig = ev->Get()->Config;
117+
TopicConverter = ev->Get()->TopicConverter;
118118
bool totalQuotaUpdated = false;
119119
if (PartitionTotalQuotaTracker.Defined()) {
120120
totalQuotaUpdated = PartitionTotalQuotaTracker->UpdateConfigIfChanged(
@@ -150,6 +150,8 @@ void TReadQuoter::OnAccountQuotaApproved(TRequestContext&& context) {
150150
}
151151

152152
TAccountQuoterHolder* TReadQuoter::GetAccountQuotaTracker(const THolder<TEvPQ::TEvRequestQuota>& request) {
153+
if (!TopicConverter)
154+
return nullptr;
153155
auto clientId = request->Request->CastAsLocal<TEvPQ::TEvRead>()->ClientId;
154156
return GetOrCreateConsumerQuota(clientId, ActorContext())->AccountQuotaTracker.Get();
155157
}
@@ -283,6 +285,7 @@ ui64 TReadQuoter::GetTotalPartitionSpeedBurst(const NKikimrPQ::TPQTabletConfig&
283285
THolder<TAccountQuoterHolder> TReadQuoter::CreateAccountQuotaTracker(const TString& user, const TActorContext& ctx) const {
284286
const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig();
285287
TActorId actorId;
288+
Y_ENSURE(TopicConverter);
286289
if (GetTabletActor() && quotingConfig.GetEnableQuoting()) {
287290
if(quotingConfig.GetEnableReadQuoting()) {
288291
actorId = TActivationContext::Register(

ydb/core/persqueue/read_quoter.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ class TWriteQuoter : public TPartitionQuoterBase {
297297
const TPartitionId& partition,
298298
TActorId tabletActor,
299299
ui64 tabletId,
300-
bool isLocalDc,
301300
const TTabletCountersBase& counters
302301
);
303302

@@ -332,8 +331,8 @@ class TWriteQuoter : public TPartitionQuoterBase {
332331
}
333332

334333
private:
334+
bool GetAccountQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig) const;
335335
bool QuotingEnabled;
336-
bool AccountQuotingEnabled;
337336
THolder<TAccountQuoterHolder> AccountQuotaTracker;
338337
};
339338

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,6 @@ TPartition* TPartitionFixture::CreatePartitionActor(const TPartitionId& id,
368368
id,
369369
Ctx->Edge,
370370
Ctx->TabletId,
371-
Config.GetLocalDC(),
372371
*TabletCounters
373372
));
374373
}

ydb/core/persqueue/write_quoter.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@ TWriteQuoter::TWriteQuoter(
1010
const TPartitionId& partition,
1111
TActorId tabletActor,
1212
ui64 tabletId,
13-
bool isLocalDc,
1413
const TTabletCountersBase& counters
1514
)
1615
: TPartitionQuoterBase(
1716
topicConverter, config, partition, tabletActor, pqConfig.GetQuotingConfig().GetEnableQuoting(),
1817
tabletId, counters, 1
1918
)
2019
, QuotingEnabled(pqConfig.GetQuotingConfig().GetEnableQuoting())
21-
, AccountQuotingEnabled(IsQuotingEnabled(pqConfig, isLocalDc))
2220
{
2321
}
2422

@@ -44,6 +42,10 @@ void TWriteQuoter::HandleConsumedImpl(TEvPQ::TEvConsumed::TPtr& ev) {
4442
}
4543
}
4644

45+
bool TWriteQuoter::GetAccountQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig) const {
46+
return IsQuotingEnabled(pqConfig, PQTabletConfig.GetLocalDC());
47+
}
48+
4749
void TWriteQuoter::HandleWakeUpImpl() {
4850
}
4951

@@ -53,7 +55,7 @@ void TWriteQuoter::UpdateQuotaConfigImpl(bool, const TActorContext&) {
5355

5456
THolder<TAccountQuoterHolder> TWriteQuoter::CreateAccountQuotaTracker() const {
5557
TActorId actorId;
56-
if (GetTabletActor() && AccountQuotingEnabled) {
58+
if (GetTabletActor() && GetAccountQuotingEnabled(AppData()->PQConfig)) {
5759
actorId = TActivationContext::Register(
5860
new TAccountWriteQuoter(
5961
GetTabletActor(),
@@ -97,9 +99,12 @@ IEventBase* TWriteQuoter::MakeQuotaApprovedEvent(TRequestContext& context) {
9799
};
98100

99101
TAccountQuoterHolder* TWriteQuoter::GetAccountQuotaTracker(const THolder<TEvPQ::TEvRequestQuota>&) {
100-
if (!AccountQuotingEnabled && !QuotingEnabled) {
102+
if (!GetAccountQuotingEnabled(AppData()->PQConfig) && !QuotingEnabled) {
101103
return nullptr;
102104
}
105+
if (!TopicConverter)
106+
return nullptr;
107+
103108
if (!AccountQuotaTracker) {
104109
AccountQuotaTracker = CreateAccountQuotaTracker();
105110
}

0 commit comments

Comments
 (0)