Skip to content

Commit 84c70d1

Browse files
PQRB added consumers counter
1 parent bff00ae commit 84c70d1

File tree

3 files changed

+125
-5
lines changed

3 files changed

+125
-5
lines changed

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,14 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset,
190190
}
191191

192192
void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId,
193-
TTestContext& context, const bool requireAuth, bool kill) {
194-
PQBalancerPrepare(topic, map, ssId, *context.Runtime, context.BalancerTabletId, context.Edge, requireAuth, kill);
193+
TTestContext& context, const bool requireAuth, bool kill, const THashSet<TString>& xtraConsumers) {
194+
PQBalancerPrepare(topic, map, ssId, *context.Runtime, context.BalancerTabletId, context.Edge, requireAuth, kill,
195+
xtraConsumers);
195196
}
196197

197198
void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId,
198-
TTestActorRuntime& runtime, ui64 balancerTabletId, TActorId edge, const bool requireAuth, bool kill) {
199+
TTestActorRuntime& runtime, ui64 balancerTabletId, TActorId edge, const bool requireAuth, bool kill,
200+
const THashSet<TString>& xtraConsumers) {
199201
TAutoPtr<IEventHandle> handle;
200202
static int version = 0;
201203
++version;
@@ -227,6 +229,9 @@ void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::p
227229
request->Record.SetPath("/Root/" + topic);
228230
request->Record.SetSchemeShardId(ssId);
229231
request->Record.MutableTabletConfig()->AddReadRules("client");
232+
for (const auto& c : xtraConsumers) {
233+
request->Record.MutableTabletConfig()->AddReadRules(c);
234+
};
230235
request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth);
231236
request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth);
232237

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ void PQBalancerPrepare(
277277
ui64 tabletId,
278278
TActorId edge,
279279
const bool requireAuth = false,
280-
bool kill = true);
280+
bool kill = true,
281+
const THashSet<TString>& xtraConsumers = {});
281282

282283
void PQTabletRestart(
283284
TTestActorRuntime& runtime,
@@ -300,7 +301,8 @@ void PQBalancerPrepare(
300301
const ui64 ssId,
301302
TTestContext& context,
302303
const bool requireAuth = false,
303-
bool kill = true);
304+
bool kill = true,
305+
const THashSet<TString>& xtraConsumers = {});
304306

305307
void PQTabletRestart(TTestContext& context);
306308

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,119 @@ Y_UNIT_TEST(ImportantFlagSwitching) {
559559
});
560560
}
561561

562+
Y_UNIT_TEST(NewConsumersCounters) {
563+
SetEnv("FAST_UT", "1");
564+
TTestContext tc;
565+
RunTestWithReboots(tc.TabletIds, [&]() {
566+
return tc.InitialEventsFilter.Prepare();
567+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
568+
TFinalizer finalizer(tc);
569+
activeZone = false;
570+
bool dbRegistered{false};
571+
bool labeledCountersReceived =false ;
572+
573+
tc.Prepare(dispatchName, setup, activeZone, true, true, true);
574+
575+
tc.Runtime->SetScheduledLimit(10000);
576+
577+
tc.Runtime->SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
578+
if (event->GetTypeRewrite() == NSysView::TEvSysView::EvRegisterDbCounters) {
579+
auto database = event.Get()->Get<NSysView::TEvSysView::TEvRegisterDbCounters>()->Database;
580+
UNIT_ASSERT_VALUES_EQUAL(database, "/Root/PQ");
581+
dbRegistered = true;
582+
} else if (event->GetTypeRewrite() == TEvTabletCounters::EvTabletAddLabeledCounters) {
583+
labeledCountersReceived = true;
584+
}
585+
return TTestActorRuntime::DefaultObserverFunc(event);
586+
});
587+
PQTabletPrepare({.deleteTime=3600, .writeSpeed = 100_KB, .meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS}, {{"client", true}}, tc);
588+
TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
589+
ui64 ssId = 325;
590+
BootFakeSchemeShard(*tc.Runtime, ssId, state);
591+
592+
PQBalancerPrepare("topic", {{0, {tc.TabletId, 1}}}, ssId, tc, false, false, {"user1", "user2"});
593+
594+
IActor* actor = CreateTabletCountersAggregator(false);
595+
auto aggregatorId = tc.Runtime->Register(actor);
596+
tc.Runtime->EnableScheduleForActor(aggregatorId);
597+
598+
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
599+
CmdWrite(0, "sourceid1", TestData(), tc, false);
600+
CmdWrite(0, "sourceid2", TestData(), tc, false);
601+
PQGetPartInfo(0, 30, tc);
602+
603+
{
604+
TDispatchOptions options;
605+
options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
606+
tc.Runtime->DispatchEvents(options);
607+
}
608+
{
609+
NSchemeCache::TDescribeResult::TPtr result = new NSchemeCache::TDescribeResult{};
610+
result->SetPath("/Root");
611+
TVector<TString> attrs = {"folder_id", "cloud_id", "database_id"};
612+
for (auto& attr : attrs) {
613+
auto ua = result->MutablePathDescription()->AddUserAttributes();
614+
ua->SetKey(attr);
615+
ua->SetValue(attr);
616+
}
617+
NSchemeCache::TDescribeResult::TCPtr cres = result;
618+
auto event = MakeHolder<TEvTxProxySchemeCache::TEvWatchNotifyUpdated>(0, "/Root", TPathId{}, cres);
619+
TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries());
620+
tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, event.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
621+
622+
TDispatchOptions options;
623+
options.FinalEvents.emplace_back(TEvTxProxySchemeCache::EvWatchNotifyUpdated);
624+
auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
625+
UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
626+
}
627+
{
628+
TDispatchOptions options;
629+
options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats);
630+
auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
631+
UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
632+
}
633+
634+
{
635+
auto counters = tc.Runtime->GetAppData(0).Counters;
636+
auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);
637+
638+
auto group = dbGroup->GetSubgroup("host", "")
639+
->GetSubgroup("database", "/Root")
640+
->GetSubgroup("cloud_id", "cloud_id")
641+
->GetSubgroup("folder_id", "folder_id")
642+
->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
643+
for (const auto& user : {"client", "user1", "user2"}) {
644+
auto consumerSG = group->FindSubgroup("consumer", user);
645+
UNIT_ASSERT_C(consumerSG, user);
646+
}
647+
}
648+
PQBalancerPrepare("topic", {{0, {tc.TabletId, 1}}}, ssId, tc, false, false, {"user3", "user2"});
649+
650+
{
651+
TDispatchOptions options;
652+
options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats);
653+
auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
654+
UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
655+
}
656+
657+
{
658+
auto counters = tc.Runtime->GetAppData(0).Counters;
659+
auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);
660+
661+
auto group = dbGroup->GetSubgroup("host", "")
662+
->GetSubgroup("database", "/Root")
663+
->GetSubgroup("cloud_id", "cloud_id")
664+
->GetSubgroup("folder_id", "folder_id")
665+
->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
666+
for (const auto& user : {"user1", "user3"}) {
667+
auto consumerSG = group->FindSubgroup("consumer", user);
668+
UNIT_ASSERT_C(consumerSG, user);
669+
}
670+
}
671+
672+
});
673+
}
674+
562675
} // Y_UNIT_TEST_SUITE(PQCountersLabeled)
563676

564677
Y_UNIT_TEST_SUITE(TMultiBucketCounter) {

0 commit comments

Comments
 (0)