Skip to content

Commit df3fd33

Browse files
committed
Auto partitioning to PQ_V1 sdk
1 parent 4d0251a commit df3fd33

File tree

3 files changed

+66
-2
lines changed

3 files changed

+66
-2
lines changed

ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,20 @@ TDescribeTopicResult::TDescribeTopicResult(TStatus status, const Ydb::PersQueue:
8686
}
8787

8888
TDescribeTopicResult::TTopicSettings::TTopicSettings(const Ydb::PersQueue::V1::TopicSettings& settings) {
89-
90-
PartitionsCount_ = settings.partitions_count();
9189
RetentionPeriod_ = TDuration::MilliSeconds(settings.retention_period_ms());
9290
SupportedFormat_ = static_cast<EFormat>(settings.supported_format());
9391

92+
if (settings.has_auto_partitioning_settings()) {
93+
PartitionsCount_ = settings.auto_partitioning_settings().min_active_partitions();
94+
MaxPartitionsCount_ = settings.auto_partitioning_settings().max_active_partitions();
95+
StabilizationWindow_ = TDuration::Seconds(settings.auto_partitioning_settings().partition_write_speed().stabilization_window().seconds());
96+
UpUtilizationPercent_ = settings.auto_partitioning_settings().partition_write_speed().up_utilization_percent();
97+
DownUtilizationPercent_ = settings.auto_partitioning_settings().partition_write_speed().down_utilization_percent();
98+
AutoPartitioningStrategy_ = settings.auto_partitioning_settings().strategy();
99+
} else {
100+
PartitionsCount_ = settings.partitions_count();
101+
}
102+
94103
for (const auto& codec : settings.supported_codecs()) {
95104
SupportedCodecs_.push_back(static_cast<ECodec>(codec));
96105
}

ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,34 @@ class TPersQueueClient::TImpl : public TClientImplCommon<TPersQueueClient::TImpl
5454

5555
props.set_partitions_count(settings.PartitionsCount_);
5656

57+
bool autoscalingSettingsDefined = false;
58+
if (settings.MaxPartitionsCount_.Defined()) {
59+
props.mutable_auto_partitioning_settings()->set_max_active_partitions(settings.PartitionsCount_);
60+
autoscalingSettingsDefined = true;
61+
}
62+
if (settings.AutoPartitioningStrategy_.Defined()) {
63+
props.mutable_auto_partitioning_settings()->set_strategy(*settings.AutoPartitioningStrategy_);
64+
autoscalingSettingsDefined = true;
65+
}
66+
if (settings.DownUtilizationPercent_.Defined()) {
67+
props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(*settings.DownUtilizationPercent_);
68+
autoscalingSettingsDefined = true;
69+
}
70+
if (settings.UpUtilizationPercent_.Defined()) {
71+
props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(*settings.UpUtilizationPercent_);
72+
autoscalingSettingsDefined = true;
73+
}
74+
if (settings.StabilizationWindow_.Defined()) {
75+
props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds((*settings.StabilizationWindow_).Seconds());
76+
autoscalingSettingsDefined = true;
77+
}
78+
if (!autoscalingSettingsDefined) {
79+
props.set_partitions_count(settings.PartitionsCount_);
80+
} else {
81+
props.mutable_auto_partitioning_settings()->set_min_active_partitions(settings.PartitionsCount_);
82+
}
83+
84+
5785
props.set_retention_period_ms(settings.RetentionPeriod_.MilliSeconds());
5886
props.set_supported_format(static_cast<Ydb::PersQueue::V1::TopicSettings::Format>(settings.SupportedFormat_));
5987
for (const auto& codec : settings.SupportedCodecs_) {

ydb/public/sdk/cpp/client/ydb_persqueue_public/include/control_plane.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ struct TDescribeTopicResult : public TStatus {
118118
}
119119
GETTER(TMaybe<TRemoteMirrorRule>, RemoteMirrorRule);
120120

121+
GETTER(TMaybe<ui64>, MaxPartitionsCount);
122+
GETTER(TMaybe<TDuration>, StabilizationWindow);
123+
GETTER(TMaybe<ui64>, UpUtilizationPercent);
124+
GETTER(TMaybe<ui64>, DownUtilizationPercent);
125+
GETTER(TMaybe<Ydb::PersQueue::V1::AutoPartitioningStrategy>, AutoPartitioningStrategy);
126+
121127

122128
#undef GETTER
123129

@@ -139,6 +145,12 @@ struct TDescribeTopicResult : public TStatus {
139145
TMaybe<ui32> AbcId_;
140146
TMaybe<TString> AbcSlug_;
141147
TString FederationAccount_;
148+
149+
TMaybe<ui64> MaxPartitionsCount_;
150+
TMaybe<TDuration> StabilizationWindow_;
151+
TMaybe<ui64> UpUtilizationPercent_;
152+
TMaybe<ui64> DownUtilizationPercent_;
153+
TMaybe<Ydb::PersQueue::V1::AutoPartitioningStrategy> AutoPartitioningStrategy_;
142154
};
143155

144156
TDescribeTopicResult(TStatus status, const Ydb::PersQueue::V1::DescribeTopicResult& result);
@@ -192,6 +204,7 @@ struct TReadRuleSettings {
192204
// Settings for topic.
193205
template <class TDerived>
194206
struct TTopicSettings : public TOperationRequestSettings<TDerived> {
207+
friend class TPersQueueClient;
195208

196209
struct TRemoteMirrorRuleSettings {
197210
TRemoteMirrorRuleSettings() {}
@@ -267,9 +280,23 @@ struct TTopicSettings : public TOperationRequestSettings<TDerived> {
267280
if (settings.RemoteMirrorRule()) {
268281
RemoteMirrorRule_ = TRemoteMirrorRuleSettings().SetSettings(settings.RemoteMirrorRule().GetRef());
269282
}
283+
284+
MaxPartitionsCount_ = settings.MaxPartitionsCount();
285+
StabilizationWindow_ = settings.StabilizationWindow();
286+
UpUtilizationPercent_ = settings.UpUtilizationPercent();
287+
DownUtilizationPercent_ = settings.DownUtilizationPercent();
288+
AutoPartitioningStrategy_ = settings.AutoPartitioningStrategy();
289+
270290
return static_cast<TDerived&>(*this);
271291
}
272292

293+
private:
294+
TMaybe<ui64> MaxPartitionsCount_;
295+
TMaybe<TDuration> StabilizationWindow_;
296+
TMaybe<ui64> UpUtilizationPercent_;
297+
TMaybe<ui64> DownUtilizationPercent_;
298+
TMaybe<Ydb::PersQueue::V1::AutoPartitioningStrategy> AutoPartitioningStrategy_;
299+
273300
};
274301

275302

0 commit comments

Comments
 (0)