Skip to content

Commit 1ae1a9c

Browse files
authored
Merge 2d522e2 into 4b411f4
2 parents 4b411f4 + 2d522e2 commit 1ae1a9c

File tree

6 files changed

+65
-19
lines changed

6 files changed

+65
-19
lines changed

ydb/core/persqueue/read_balancer.cpp

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
547547
}
548548

549549
Balancer->UpdateConfig(newPartitionsIds, deletedPartitions, ctx);
550+
551+
UpdateConfigCounters();
550552
}
551553

552554

@@ -799,50 +801,74 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) {
799801
UpdateCounters(ctx);
800802
}
801803

802-
void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
803-
if (!AggregatedStats.Stats.size())
804+
void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) {
805+
if (!DatabasePath) {
804806
return;
807+
}
805808

806-
if (!DatabasePath)
809+
if (DynamicCounters) {
807810
return;
808-
809-
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
810-
THolder<TPartitionLabeledCounters> labeledCounters;
811-
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
812-
THolder<TConsumerLabeledCounters> labeledConsumerCounters;
813-
814-
815-
labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
816-
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));
817-
818-
auto counters = AppData(ctx)->Counters;
819-
bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
811+
}
820812

821813
TStringBuf name = TStringBuf(Path);
822814
name.SkipPrefix(DatabasePath);
823815
name.SkipPrefix("/");
824-
counters = counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics")
816+
817+
bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
818+
DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics")
825819
->GetSubgroup("host", "")
826820
->GetSubgroup("database", DatabasePath)
827821
->GetSubgroup("cloud_id", CloudId)
828822
->GetSubgroup("folder_id", FolderId)
829823
->GetSubgroup("database_id", DatabaseId)
830824
->GetSubgroup("topic", TString(name));
831825

826+
ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false);
827+
InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false);
828+
}
829+
830+
void TPersQueueReadBalancer::UpdateConfigCounters() {
831+
if (!DynamicCounters) {
832+
return;
833+
}
834+
835+
size_t inactiveCount = std::count_if(TabletConfig.GetPartitions().begin(), TabletConfig.GetPartitions().end(), [](auto& p) {
836+
return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive;
837+
});
838+
839+
ActivePartitionCountCounter->Set(PartitionsInfo.size() - inactiveCount);
840+
InactivePartitionCountCounter->Set(inactiveCount);
841+
}
842+
843+
void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
844+
if (!AggregatedStats.Stats.size())
845+
return;
846+
847+
if (!DynamicCounters)
848+
return;
849+
850+
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
851+
THolder<TPartitionLabeledCounters> labeledCounters;
852+
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
853+
THolder<TConsumerLabeledCounters> labeledConsumerCounters;
854+
855+
labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
856+
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));
857+
832858
if (AggregatedCounters.empty()) {
833859
for (ui32 i = 0; i < labeledCounters->GetCounters().Size(); ++i) {
834860
TString name = labeledCounters->GetNames()[i];
835861
TStringBuf nameBuf = name;
836862
nameBuf.SkipPrefix("PQ/");
837863
name = nameBuf;
838-
AggregatedCounters.push_back(name.empty() ? nullptr : counters->GetExpiringNamedCounter("name", name, false));
864+
AggregatedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
839865
}
840866
}
841867

842868
for (auto& [consumer, info]: Consumers) {
843869
info.Aggr.Reset(new TTabletLabeledCountersBase{});
844870
if (info.AggregatedCounters.empty()) {
845-
auto clientCounters = counters->GetSubgroup("consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx));
871+
auto clientCounters = DynamicCounters->GetSubgroup("consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx));
846872
for (ui32 i = 0; i < labeledConsumerCounters->GetCounters().Size(); ++i) {
847873
TString name = labeledConsumerCounters->GetNames()[i];
848874
TStringBuf nameBuf = name;
@@ -1106,6 +1132,9 @@ void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated
11061132
if (attr.GetKey() == "cloud_id") CloudId = attr.GetValue();
11071133
if (attr.GetKey() == "database_id") DatabaseId = attr.GetValue();
11081134
}
1135+
1136+
InitCounters(ctx);
1137+
UpdateConfigCounters();
11091138
}
11101139

11111140
if (PartitionsScaleManager) {

ydb/core/persqueue/read_balancer.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
112112
void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&, bool pipeReconnected = false);
113113
void ClosePipe(const ui64 tabletId, const TActorContext&);
114114
void CheckStat(const TActorContext&);
115+
116+
void InitCounters(const TActorContext&);
115117
void UpdateCounters(const TActorContext&);
118+
void UpdateConfigCounters();
116119

117120
void RespondWithACL(
118121
const TEvPersQueue::TEvCheckACL::TPtr &request,
@@ -217,6 +220,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
217220

218221
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters;
219222

223+
NMonitoring::TDynamicCounterPtr DynamicCounters;
224+
NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter;
225+
NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter;
226+
220227
TString DatabasePath;
221228
TString DatabaseId;
222229
TString FolderId;

ydb/core/persqueue/read_balancer__txinit.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
5858
Self->Consumers[consumer.GetName()];
5959
}
6060
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
61+
Self->UpdateConfigCounters();
6162
}
6263
Self->Inited = true;
6364
if (!dataRowset.Next())

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,15 @@ void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::p
209209
part->SetPartition(p.first);
210210
part->SetGroup(p.second.second);
211211
part->SetTabletId(p.second.first);
212+
part->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Active);
212213

213214
auto tablet = request->Record.AddTablets();
214215
tablet->SetTabletId(p.second.first);
215216
tablet->SetOwner(1);
216217
tablet->SetIdx(p.second.first);
218+
219+
auto* pp = request->Record.MutableTabletConfig()->AddPartitions();
220+
pp->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Active);
217221
}
218222
request->Record.SetTxId(12345);
219223
request->Record.SetPathId(1);

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,10 @@ Y_UNIT_TEST(PartitionFirstClass) {
423423
auto counters = tc.Runtime->GetAppData(0).Counters;
424424
auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);
425425

426-
auto group = dbGroup->GetSubgroup("host", "")->GetSubgroup("database", "/Root")->GetSubgroup("cloud_id", "cloud_id")->GetSubgroup("folder_id", "folder_id")
426+
auto group = dbGroup->GetSubgroup("host", "")
427+
->GetSubgroup("database", "/Root")
428+
->GetSubgroup("cloud_id", "cloud_id")
429+
->GetSubgroup("folder_id", "folder_id")
427430
->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
428431
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
429432
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);

ydb/core/persqueue/ut/resources/counters_topics.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
database_id=database_id:
1111

1212
topic=topic:
13+
name=topic.partition.active_count: 1
1314
name=topic.partition.alive_count: 1
15+
name=topic.partition.inactive_count: 0
1416
name=topic.partition.init_duration_milliseconds_max: 0
1517
name=topic.partition.producers_count_max: 3
1618
name=topic.partition.read.inflight_throttled_microseconds_max: 0

0 commit comments

Comments
 (0)