Skip to content

Commit e3a492a

Browse files
authored
Merge 4787e02 into 590c21c
2 parents 590c21c + 4787e02 commit e3a492a

File tree

8 files changed

+35
-16
lines changed

8 files changed

+35
-16
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
748748
auto& userInfo = userInfoPair.second;
749749
if (!userInfo.LabeledCounters)
750750
continue;
751-
if (!userInfo.HasReadRule && !userInfo.Important)
751+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
752752
continue;
753753
auto* cac = ac->AddConsumerAggregatedCounters();
754754
cac->SetConsumer(userInfo.User);
@@ -1083,7 +1083,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
10831083
auto& userInfo = userInfoPair.second;
10841084
if (!userInfo.LabeledCounters)
10851085
continue;
1086-
if (!userInfo.HasReadRule && !userInfo.Important)
1086+
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
10871087
continue;
10881088
bool haveChanges = false;
10891089
userInfo.EndOffset = EndOffset;
@@ -1805,7 +1805,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18051805
} else {
18061806
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
18071807
}
1808-
} else {
1808+
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
18091809
auto ui = UsersInfoStorage->GetIfExists(user);
18101810
if (ui && ui->LabeledCounters) {
18111811
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());

ydb/core/persqueue/partition_init.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
886886
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
887887
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
888888
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
889-
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
889+
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
890890
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
891891
new NKikimr::NPQ::TPercentileCounter(
892892
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
@@ -899,7 +899,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
899899
subgroups.pop_back();
900900
}
901901

902-
subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
902+
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
903903
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
904904
new NKikimr::NPQ::TPercentileCounter(
905905
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",

ydb/core/persqueue/read_balancer.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "read_balancer.h"
22

33
#include <ydb/core/persqueue/events/internal.h>
4+
#include "ydb/core/persqueue/user_info.h"
45
#include <ydb/core/protos/counters_pq.pb.h>
56
#include <ydb/core/base/feature_flags.h>
67
#include <ydb/core/tablet/tablet_exception.h>
@@ -65,6 +66,7 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
6566
for (const auto& rr : Self->TabletConfig.GetReadRules()) {
6667
Self->Consumers[rr];
6768
}
69+
Self->Consumers[CLIENTID_WITHOUT_CONSUMER];
6870
}
6971
Self->Inited = true;
7072
if (!dataRowset.Next())
@@ -536,7 +538,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
536538
Consumers[rr];
537539
}
538540
}
539-
541+
Consumers[CLIENTID_WITHOUT_CONSUMER];
542+
540543
TVector<std::pair<ui32, TPartInfo>> newPartitions;
541544
TVector<ui32> deletedPartitions;
542545
TVector<std::pair<ui64, TTabletInfo>> newTablets;

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
206206
.UseTopicCommit = OnlyTableInTx,
207207
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
208208
.UseTableUpsert = !OnlyTopicInTx,
209+
.ReadWithoutConsumer = ReadWithoutConsumer,
209210
.CommitPeriod = CommitPeriod,
210211
.CommitMessages = CommitMessages
211212
};

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class TTopicOperationsScenario {
7070
bool OnlyTopicInTx = false;
7171
bool OnlyTableInTx = false;
7272
bool UseTableSelect = true;
73+
bool ReadWithoutConsumer = false;
7374

7475
protected:
7576
void CreateTopic(const TString& database,

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,33 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
2424
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
2525
std::optional<TTransactionSupport> txSupport;
2626

27-
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
2827
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
29-
auto consumers = describeTopicResult.GetConsumers();
28+
NYdb::NTopic::TReadSessionSettings settings;
29+
30+
if (!params.ReadWithoutConsumer) {
31+
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
32+
auto consumers = describeTopicResult.GetConsumers();
3033

31-
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
32-
{
33-
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
34-
exit(EXIT_FAILURE);
34+
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
35+
{
36+
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
37+
exit(EXIT_FAILURE);
38+
}
39+
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
40+
} else {
41+
NYdb::NTopic::TTopicReadSettings topic = params.TopicName;
42+
auto partitions = describeTopicResult.GetPartitions();
43+
for(auto partition: partitions) {
44+
topic.AppendPartitionIds(partition.GetPartitionId());
45+
}
46+
settings.WithoutConsumer().AppendTopics(topic);
3547
}
3648

49+
3750
if (params.UseTransactions) {
3851
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
3952
}
4053

41-
NYdb::NTopic::TReadSessionSettings settings;
42-
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
43-
4454
auto readSession = topicClient->CreateReadSession(settings);
4555
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created.");
4656

@@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
93103
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
94104
}
95105

96-
if (!txSupport || params.UseTopicCommit) {
106+
if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) {
97107
dataEvent->Commit();
98108
}
99109
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace NYdb {
3030
bool UseTopicCommit = false;
3131
bool UseTableSelect = true;
3232
bool UseTableUpsert = true;
33+
bool ReadWithoutConsumer = false;
3334
size_t CommitPeriod = 15;
3435
size_t CommitMessages = 1'000'000;
3536
};

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config)
3434
config.Opts->AddLongOption("topic", "Topic name.")
3535
.DefaultValue(TOPIC)
3636
.StoreResult(&Scenario.TopicName);
37+
config.Opts->AddLongOption("no-consumer", "Read without consumer")
38+
.Hidden()
39+
.StoreTrue(&Scenario.ReadWithoutConsumer);
3740

3841
// Specific params
3942
config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")

0 commit comments

Comments
 (0)