Skip to content

Fix config delivery to quoter #15847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4640,7 +4640,6 @@ TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) {
partition,
SelfId(),
TabletID(),
IsLocalDC,
*Counters
));
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/read_quoter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ void TPartitionQuoterBase::StartQuoting(TRequestContext&& context) {
auto& request = context.Request;
auto* accountQuotaTracker = GetAccountQuotaTracker(request);
if (accountQuotaTracker) {

Send(accountQuotaTracker->Actor, new NAccountQuoterEvents::TEvRequest(request->Cookie, request->Request.Release()));
PendingAccountQuotaRequests[request->Cookie] = std::move(context);
} else {
Expand Down Expand Up @@ -115,6 +114,7 @@ void TPartitionQuoterBase::ProcessInflightQueue() {

void TPartitionQuoterBase::HandleConfigUpdate(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
PQTabletConfig = ev->Get()->Config;
TopicConverter = ev->Get()->TopicConverter;
bool totalQuotaUpdated = false;
if (PartitionTotalQuotaTracker.Defined()) {
totalQuotaUpdated = PartitionTotalQuotaTracker->UpdateConfigIfChanged(
Expand Down Expand Up @@ -150,6 +150,8 @@ void TReadQuoter::OnAccountQuotaApproved(TRequestContext&& context) {
}

TAccountQuoterHolder* TReadQuoter::GetAccountQuotaTracker(const THolder<TEvPQ::TEvRequestQuota>& request) {
if (!TopicConverter)
return nullptr;
auto clientId = request->Request->CastAsLocal<TEvPQ::TEvRead>()->ClientId;
return GetOrCreateConsumerQuota(clientId, ActorContext())->AccountQuotaTracker.Get();
}
Expand Down Expand Up @@ -283,6 +285,7 @@ ui64 TReadQuoter::GetTotalPartitionSpeedBurst(const NKikimrPQ::TPQTabletConfig&
THolder<TAccountQuoterHolder> TReadQuoter::CreateAccountQuotaTracker(const TString& user, const TActorContext& ctx) const {
const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig();
TActorId actorId;
Y_ENSURE(TopicConverter);
if (GetTabletActor() && quotingConfig.GetEnableQuoting()) {
if(quotingConfig.GetEnableReadQuoting()) {
actorId = TActivationContext::Register(
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/persqueue/read_quoter.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ class TWriteQuoter : public TPartitionQuoterBase {
const TPartitionId& partition,
TActorId tabletActor,
ui64 tabletId,
bool isLocalDc,
const TTabletCountersBase& counters
);

Expand Down Expand Up @@ -332,8 +331,8 @@ class TWriteQuoter : public TPartitionQuoterBase {
}

private:
bool GetAccountQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig) const;
bool QuotingEnabled;
bool AccountQuotingEnabled;
THolder<TAccountQuoterHolder> AccountQuotaTracker;
};

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ TPartition* TPartitionFixture::CreatePartitionActor(const TPartitionId& id,
id,
Ctx->Edge,
Ctx->TabletId,
Config.GetLocalDC(),
*TabletCounters
));
}
Expand Down
13 changes: 9 additions & 4 deletions ydb/core/persqueue/write_quoter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ TWriteQuoter::TWriteQuoter(
const TPartitionId& partition,
TActorId tabletActor,
ui64 tabletId,
bool isLocalDc,
const TTabletCountersBase& counters
)
: TPartitionQuoterBase(
topicConverter, config, partition, tabletActor, pqConfig.GetQuotingConfig().GetEnableQuoting(),
tabletId, counters, 1
)
, QuotingEnabled(pqConfig.GetQuotingConfig().GetEnableQuoting())
, AccountQuotingEnabled(IsQuotingEnabled(pqConfig, isLocalDc))
{
}

Expand All @@ -44,6 +42,10 @@ void TWriteQuoter::HandleConsumedImpl(TEvPQ::TEvConsumed::TPtr& ev) {
}
}

bool TWriteQuoter::GetAccountQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig) const {
return IsQuotingEnabled(pqConfig, PQTabletConfig.GetLocalDC());
}

void TWriteQuoter::HandleWakeUpImpl() {
}

Expand All @@ -53,7 +55,7 @@ void TWriteQuoter::UpdateQuotaConfigImpl(bool, const TActorContext&) {

THolder<TAccountQuoterHolder> TWriteQuoter::CreateAccountQuotaTracker() const {
TActorId actorId;
if (GetTabletActor() && AccountQuotingEnabled) {
if (GetTabletActor() && GetAccountQuotingEnabled(AppData()->PQConfig)) {
actorId = TActivationContext::Register(
new TAccountWriteQuoter(
GetTabletActor(),
Expand Down Expand Up @@ -97,9 +99,12 @@ IEventBase* TWriteQuoter::MakeQuotaApprovedEvent(TRequestContext& context) {
};

TAccountQuoterHolder* TWriteQuoter::GetAccountQuotaTracker(const THolder<TEvPQ::TEvRequestQuota>&) {
if (!AccountQuotingEnabled && !QuotingEnabled) {
if (!GetAccountQuotingEnabled(AppData()->PQConfig) && !QuotingEnabled) {
return nullptr;
}
if (!TopicConverter)
return nullptr;

if (!AccountQuotaTracker) {
AccountQuotaTracker = CreateAccountQuotaTracker();
}
Expand Down
Loading