Skip to content

Commit 7042e11

Browse files
authored
Merge 3721c57 into def1b93
2 parents def1b93 + 3721c57 commit 7042e11

14 files changed

+88
-53
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
769769
auto& userInfo = userInfoPair.second;
770770
if (!userInfo.LabeledCounters)
771771
continue;
772-
if (!userInfo.HasReadRule && !userInfo.Important)
772+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
773773
continue;
774774
auto* cac = ac->AddConsumerAggregatedCounters();
775775
cac->SetConsumer(userInfo.User);
@@ -1124,7 +1124,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
11241124
auto& userInfo = userInfoPair.second;
11251125
if (!userInfo.LabeledCounters)
11261126
continue;
1127-
if (!userInfo.HasReadRule && !userInfo.Important)
1127+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
11281128
continue;
11291129
bool haveChanges = false;
11301130
userInfo.EndOffset = EndOffset;
@@ -1228,6 +1228,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
12281228
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
12291229
}
12301230
}
1231+
1232+
if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) {
1233+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get());
1234+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get());
1235+
}
1236+
12311237
if (haveChanges) {
12321238
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
12331239
}
@@ -1339,6 +1345,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
13391345
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
13401346
}
13411347
}
1348+
1349+
if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) {
1350+
ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
1351+
if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
1352+
haveChanges = true;
1353+
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
1354+
}
1355+
}
13421356
return haveChanges;
13431357
}
13441358

@@ -1853,7 +1867,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18531867
if (LastOffsetHasBeenCommited(userInfo)) {
18541868
SendReadingFinished(user);
18551869
}
1856-
} else {
1870+
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
18571871
auto ui = UsersInfoStorage->GetIfExists(user);
18581872
if (ui && ui->LabeledCounters) {
18591873
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());

ydb/core/persqueue/partition_init.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
883883
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
884884
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
885885
if (IsQuotingEnabled()) {
886-
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
886+
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
887887
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
888888
new NKikimr::NPQ::TPercentileCounter(
889889
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
@@ -896,7 +896,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
896896
subgroups.pop_back();
897897
}
898898

899-
subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
899+
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
900900
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
901901
new NKikimr::NPQ::TPercentileCounter(
902902
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ Y_UNIT_TEST(PartitionWriteQuota) {
138138
TStringStream histogramStr;
139139
histogram->OutputHtml(histogramStr);
140140
Cerr << "**** Total histogram: **** \n " << histogramStr.Str() << "**** **** **** ****" << Endl;
141-
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "1000ms")->Val(), 3);
142-
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 1);
141+
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "0ms")->Val(),2);
142+
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 5);
143143
}
144144
}
145145

ydb/core/persqueue/ut/resources/counters_datastreams.html

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,15 @@
2525
bin=60000: 0
2626
bin=999999: 0
2727

28-
name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
29-
bin=0: 30
30-
bin=1: 0
31-
bin=10: 0
32-
bin=100: 0
33-
bin=1000: 0
34-
bin=10000: 0
35-
bin=20: 0
36-
bin=2500: 0
37-
bin=5: 0
38-
bin=50: 0
39-
bin=500: 0
40-
bin=5000: 0
41-
bin=999999: 0
42-
4328
name=topic.write.lag_milliseconds:
4429
bin=100: 0
45-
bin=1000: 10
30+
bin=1000: 0
4631
bin=10000: 0
4732
bin=180000: 0
4833
bin=200: 0
4934
bin=2000: 0
5035
bin=30000: 0
51-
bin=500: 20
36+
bin=500: 30
5237
bin=5000: 0
5338
bin=60000: 0
5439
bin=999999: 0
@@ -68,4 +53,19 @@
6853
bin=5242880: 0
6954
bin=67108864: 0
7055
bin=99999999: 0
56+
57+
name=topic.write.partition_throttled_milliseconds:
58+
bin=0: 30
59+
bin=1: 0
60+
bin=10: 0
61+
bin=100: 0
62+
bin=1000: 0
63+
bin=10000: 0
64+
bin=20: 0
65+
bin=2500: 0
66+
bin=5: 0
67+
bin=50: 0
68+
bin=500: 0
69+
bin=5000: 0
70+
bin=999999: 0
7171
</pre>

ydb/core/persqueue/ut/resources/counters_pqproxy.html

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,26 @@
1414
Account=asdfgs:
1515
Duration=10000ms: 0
1616
Duration=1000ms: 0
17-
Duration=100ms: 0
17+
Duration=100ms: 3
1818
Duration=1500ms: 0
1919
Duration=2000ms: 0
2020
Duration=200ms: 0
2121
Duration=30000ms: 0
2222
Duration=5000ms: 0
23-
Duration=500ms: 3
23+
Duration=500ms: 0
2424
Duration=550ms: 0
2525
Duration=99999999ms: 0
2626

2727
Account=total:
2828
Duration=10000ms: 0
2929
Duration=1000ms: 0
30-
Duration=100ms: 0
30+
Duration=100ms: 3
3131
Duration=1500ms: 0
3232
Duration=2000ms: 0
3333
Duration=200ms: 0
3434
Duration=30000ms: 0
3535
Duration=5000ms: 0
36-
Duration=500ms: 3
36+
Duration=500ms: 0
3737
Duration=550ms: 0
3838
Duration=99999999ms: 0
3939

@@ -543,29 +543,29 @@
543543

544544
sensor=TimeLagsOriginal:
545545
Interval=10000ms: 0
546-
Interval=1000ms: 10
546+
Interval=1000ms: 0
547547
Interval=100ms: 0
548548
Interval=180000ms: 0
549549
Interval=2000ms: 0
550550
Interval=200ms: 0
551551
Interval=30000ms: 0
552552
Interval=5000ms: 0
553-
Interval=500ms: 20
553+
Interval=500ms: 30
554554
Interval=60000ms: 0
555555
Interval=999999ms: 0
556556

557557
OriginDC=cluster:
558558

559559
sensor=TimeLagsOriginal:
560560
Interval=10000ms: 0
561-
Interval=1000ms: 10
561+
Interval=1000ms: 0
562562
Interval=100ms: 0
563563
Interval=180000ms: 0
564564
Interval=2000ms: 0
565565
Interval=200ms: 0
566566
Interval=30000ms: 0
567567
Interval=5000ms: 0
568-
Interval=500ms: 20
568+
Interval=500ms: 30
569569
Interval=60000ms: 0
570570
Interval=999999ms: 0
571571

@@ -577,14 +577,14 @@
577577

578578
sensor=TimeLagsOriginal:
579579
Interval=10000ms: 0
580-
Interval=1000ms: 10
580+
Interval=1000ms: 0
581581
Interval=100ms: 0
582582
Interval=180000ms: 0
583583
Interval=2000ms: 0
584584
Interval=200ms: 0
585585
Interval=30000ms: 0
586586
Interval=5000ms: 0
587-
Interval=500ms: 20
587+
Interval=500ms: 30
588588
Interval=60000ms: 0
589589
Interval=999999ms: 0
590590

@@ -598,14 +598,14 @@
598598

599599
sensor=TimeLagsOriginal:
600600
Interval=10000ms: 0
601-
Interval=1000ms: 10
601+
Interval=1000ms: 0
602602
Interval=100ms: 0
603603
Interval=180000ms: 0
604604
Interval=2000ms: 0
605605
Interval=200ms: 0
606606
Interval=30000ms: 0
607607
Interval=5000ms: 0
608-
Interval=500ms: 20
608+
Interval=500ms: 30
609609
Interval=60000ms: 0
610610
Interval=999999ms: 0
611611

@@ -621,14 +621,14 @@
621621

622622
sensor=TimeLagsOriginal:
623623
Interval=10000ms: 0
624-
Interval=1000ms: 10
624+
Interval=1000ms: 0
625625
Interval=100ms: 0
626626
Interval=180000ms: 0
627627
Interval=2000ms: 0
628628
Interval=200ms: 0
629629
Interval=30000ms: 0
630630
Interval=5000ms: 0
631-
Interval=500ms: 20
631+
Interval=500ms: 30
632632
Interval=60000ms: 0
633633
Interval=999999ms: 0
634634

ydb/core/persqueue/ut/resources/counters_pqproxy_firstclass.html

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,26 @@
1414
Account=federationAccount:
1515
Duration=10000ms: 0
1616
Duration=1000ms: 0
17-
Duration=100ms: 0
17+
Duration=100ms: 3
1818
Duration=1500ms: 0
1919
Duration=2000ms: 0
2020
Duration=200ms: 0
2121
Duration=30000ms: 0
2222
Duration=5000ms: 0
23-
Duration=500ms: 3
23+
Duration=500ms: 0
2424
Duration=550ms: 0
2525
Duration=99999999ms: 0
2626

2727
Account=total:
2828
Duration=10000ms: 0
2929
Duration=1000ms: 0
30-
Duration=100ms: 0
30+
Duration=100ms: 3
3131
Duration=1500ms: 0
3232
Duration=2000ms: 0
3333
Duration=200ms: 0
3434
Duration=30000ms: 0
3535
Duration=5000ms: 0
36-
Duration=500ms: 3
36+
Duration=500ms: 0
3737
Duration=550ms: 0
3838
Duration=99999999ms: 0
3939
</pre>

ydb/core/persqueue/ut/resources/counters_topics.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
name=topic.partition.read.inflight_throttled_microseconds_max: 0
1717
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
1818
name=topic.partition.read.throttled_microseconds_max: 0
19+
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
20+
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
1921
name=topic.partition.storage_bytes_max: 0
2022
name=topic.partition.total_count: 2
2123
name=topic.partition.uptime_milliseconds_min: 30000

ydb/core/protos/counters_pq.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,7 @@ enum EPartitionLabeledCounters {
237237
METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}];
238238

239239
METRIC_READ_INFLIGHT_LIMIT_THROTTLED = 39 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.inflight_throttled_microseconds_max"}];
240+
241+
METRIC_READ_QUOTA_NO_CONSUMER_BYTES = 40 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read_without_consumer.speed_limit_bytes_per_second"}];
242+
METRIC_READ_QUOTA_NO_CONSUMER_USAGE = 41 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read_without_consumer.throttled_microseconds_max"}];
240243
}

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
206206
.UseTopicCommit = OnlyTableInTx,
207207
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
208208
.UseTableUpsert = !OnlyTopicInTx,
209+
.ReadWithoutConsumer = ReadWithoutConsumer,
209210
.CommitPeriod = CommitPeriod,
210211
.CommitMessages = CommitMessages
211212
};

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class TTopicOperationsScenario {
7070
bool OnlyTopicInTx = false;
7171
bool OnlyTableInTx = false;
7272
bool UseTableSelect = true;
73+
bool ReadWithoutConsumer = false;
7374

7475
protected:
7576
void CreateTopic(const TString& database,

0 commit comments

Comments
 (0)