Skip to content

Commit adf239a

Browse files
authored
Merge fd52261 into 77b77cc
2 parents 77b77cc + fd52261 commit adf239a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1052
-105
lines changed

ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,18 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso
118118

119119
TPoolSettings resourcePoolSettings;
120120
auto& properties = *resourcePoolDescription.MutableProperties()->MutableProperties();
121-
for (const auto& [property, setting] : GetPropertiesMap(resourcePoolSettings, true)) {
121+
for (const auto& [property, setting] : resourcePoolSettings.GetPropertiesMap(true)) {
122122
if (std::optional<TString> value = featuresExtractor.Extract(property)) {
123123
try {
124-
std::visit(TSettingsParser{*value}, setting);
124+
std::visit(TPoolSettings::TParser{*value}, setting);
125125
} catch (...) {
126126
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
127127
}
128128
} else if (!featuresExtractor.ExtractResetFeature(property)) {
129129
continue;
130130
}
131131

132-
TString value = std::visit(TSettingsExtractor(), setting);
132+
const TString value = std::visit(TPoolSettings::TExtractor(), setting);
133133
properties.insert({property, value});
134134
}
135135

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#include "behaviour.h"
2+
#include "initializer.h"
3+
#include "manager.h"
4+
5+
6+
namespace NKikimr::NKqp {
7+
8+
TResourcePoolClassifierBehaviour::TFactory::TRegistrator<TResourcePoolClassifierBehaviour> TResourcePoolClassifierBehaviour::Registrator(TResourcePoolClassifierConfig::GetTypeId());
9+
10+
NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolClassifierBehaviour::ConstructInitializer() const {
11+
return std::make_shared<TResourcePoolClassifierInitializer>();
12+
}
13+
14+
NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolClassifierBehaviour::ConstructOperationsManager() const {
15+
return std::make_shared<TResourcePoolClassifierManager>();
16+
}
17+
18+
TString TResourcePoolClassifierBehaviour::GetInternalStorageTablePath() const {
19+
return "resource_pools/resource_pools_classifiers";
20+
}
21+
22+
TString TResourcePoolClassifierBehaviour::GetTypeId() const {
23+
return TResourcePoolClassifierConfig::GetTypeId();
24+
}
25+
26+
NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierBehaviour::GetInstance() {
27+
static std::shared_ptr<TResourcePoolClassifierBehaviour> result = std::make_shared<TResourcePoolClassifierBehaviour>();
28+
return result;
29+
}
30+
31+
} // namespace NKikimr::NKqp
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include "object.h"
4+
5+
#include <ydb/services/metadata/abstract/initialization.h>
6+
#include <ydb/services/metadata/abstract/kqp_common.h>
7+
8+
9+
namespace NKikimr::NKqp {
10+
11+
class TResourcePoolClassifierBehaviour : public NMetadata::TClassBehaviour<TResourcePoolClassifierConfig> {
12+
static TFactory::TRegistrator<TResourcePoolClassifierBehaviour> Registrator;
13+
14+
protected:
15+
virtual NMetadata::NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
16+
virtual NMetadata::NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
17+
virtual TString GetInternalStorageTablePath() const override;
18+
19+
public:
20+
virtual TString GetTypeId() const override;
21+
22+
static IClassBehaviour::TPtr GetInstance();
23+
};
24+
25+
} // namespace NKikimr::NKqp
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
#include "checker.h"
2+
3+
#include <ydb/core/cms/console/configs_dispatcher.h>
4+
#include <ydb/core/kqp/workload_service/actors/actors.h>
5+
#include <ydb/core/kqp/workload_service/common/events.h>
6+
#include <ydb/core/protos/console_config.pb.h>
7+
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>
8+
9+
#include <ydb/library/query_actor/query_actor.h>
10+
11+
12+
namespace NKikimr::NKqp {
13+
14+
namespace {
15+
16+
using namespace NActors;
17+
using namespace NResourcePool;
18+
using namespace NWorkload;
19+
20+
21+
class TRanksCheckerActor : public NKikimr::TQueryBase {
22+
using TBase = NKikimr::TQueryBase;
23+
24+
public:
25+
TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
26+
: TBase(NKikimrServices::KQP_GATEWAY, sessionId)
27+
, Database(database)
28+
, RanksToCheck(ranksToCheck)
29+
{
30+
TxId = transactionId;
31+
SetOperationInfo(__func__, Database);
32+
}
33+
34+
void OnRunQuery() override {
35+
const auto& tablePath = TResourcePoolClassifierConfig::GetBehaviour()->GetStorageTablePath();
36+
37+
TStringBuilder sql = TStringBuilder() << R"(
38+
-- TRanksCheckerActor::OnRunQuery
39+
DECLARE $database AS Text;
40+
)";
41+
42+
NYdb::TParamsBuilder params;
43+
params
44+
.AddParam("$database")
45+
.Utf8(Database)
46+
.Build();
47+
48+
if (!RanksToCheck.empty()) {
49+
sql << R"(
50+
DECLARE $ranks AS List<Int64>;
51+
PRAGMA AnsiInForEmptyOrNullableItemsCollections;
52+
53+
SELECT
54+
rank, name
55+
FROM `)" << tablePath << R"(`
56+
WHERE database = $database
57+
AND rank IN $ranks;
58+
)";
59+
60+
auto& param = params.AddParam("$ranks").BeginList();
61+
for (const auto& [rank, _] : RanksToCheck) {
62+
param.AddListItem().Int64(rank);
63+
}
64+
param.EndList().Build();
65+
66+
ExpectedResultSets++;
67+
}
68+
69+
sql << R"(
70+
SELECT
71+
MAX(rank) AS MaxRank,
72+
COUNT(*) AS NumberClassifiers
73+
FROM `)" << tablePath << R"(`
74+
WHERE database = $database;
75+
)";
76+
77+
RunDataQuery(sql, &params, TTxControl::ContinueTx());
78+
}
79+
80+
void OnQueryResult() override {
81+
if (ResultSets.size() != ExpectedResultSets) {
82+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
83+
return;
84+
}
85+
86+
ui64 resultSetId = 0;
87+
if (!RanksToCheck.empty()) {
88+
NYdb::TResultSetParser result(ResultSets[resultSetId++]);
89+
while (result.TryNextRow()) {
90+
TMaybe<i64> rank = result.ColumnParser("rank").GetOptionalInt64();
91+
if (!rank) {
92+
continue;
93+
}
94+
95+
TMaybe<TString> name = result.ColumnParser("name").GetOptionalUtf8();
96+
if (!name) {
97+
continue;
98+
}
99+
100+
if (auto it = RanksToCheck.find(*rank); it != RanksToCheck.end() && it->second != *name) {
101+
Finish(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Classifier with rank " << *rank << " already exists, its name " << *name);
102+
return;
103+
}
104+
}
105+
}
106+
107+
{ // Classifiers stats
108+
NYdb::TResultSetParser result(ResultSets[resultSetId++]);
109+
if (!result.TryNextRow()) {
110+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
111+
return;
112+
}
113+
114+
MaxRank = result.ColumnParser("MaxRank").GetOptionalInt64().GetOrElse(0);
115+
NumberClassifiers = result.ColumnParser("NumberClassifiers").GetUint64();
116+
}
117+
118+
Finish();
119+
}
120+
121+
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
122+
Send(Owner, new TEvPrivate::TEvRanksCheckerResponse(status, MaxRank, NumberClassifiers, std::move(issues)));
123+
}
124+
125+
private:
126+
const TString Database;
127+
const std::unordered_map<i64, TString> RanksToCheck;
128+
129+
ui64 ExpectedResultSets = 1;
130+
i64 MaxRank = 0;
131+
ui64 NumberClassifiers = 0;
132+
};
133+
134+
class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResourcePoolClassifierPreparationActor> {
135+
public:
136+
TResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext)
137+
: Context(context)
138+
, AlterContext(alterContext)
139+
, Controller(std::move(controller))
140+
, PatchedObjects(std::move(patchedObjects))
141+
{}
142+
143+
void Bootstrap() {
144+
Become(&TResourcePoolClassifierPreparationActor::StateFunc);
145+
ValidateRanks();
146+
GetDatabaseInfo();
147+
}
148+
149+
void Handle(TEvPrivate::TEvRanksCheckerResponse::TPtr& ev) {
150+
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
151+
FailAndPassAway("Resource pool classifier ranks check failed", ev->Get()->Status, ev->Get()->Issues);
152+
return;
153+
}
154+
155+
if (ev->Get()->NumberClassifiers >= CLASSIFIER_COUNT_LIMIT) {
156+
FailAndPassAway(TStringBuilder() << "Number of resource pool classifiers reached limit in " << CLASSIFIER_COUNT_LIMIT);
157+
return;
158+
}
159+
160+
i64 maxRank = ev->Get()->MaxRank;
161+
for (auto& object : PatchedObjects) {
162+
if (object.GetRank() != -1) {
163+
continue;
164+
}
165+
if (maxRank > std::numeric_limits<i64>::max() - CLASSIFIER_RANK_OFFSET) {
166+
FailAndPassAway(TStringBuilder() << "The rank could not be set automatically, the maximum rank of the resource pool classifier is too high: " << ev->Get()->MaxRank);
167+
return;
168+
}
169+
170+
maxRank += CLASSIFIER_RANK_OFFSET;
171+
object.SetRank(maxRank);
172+
}
173+
174+
RanksChecked = true;
175+
TryFinish();
176+
}
177+
178+
void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
179+
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
180+
FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues);
181+
return;
182+
}
183+
184+
Serverless = ev->Get()->Serverless;
185+
186+
Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvGetConfigRequest(
187+
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
188+
), IEventHandle::FlagTrackDelivery);
189+
}
190+
191+
void Handle(TEvents::TEvUndelivered::TPtr& ev) {
192+
switch (ev->Get()->SourceType) {
193+
case NConsole::TEvConfigsDispatcher::EvGetConfigRequest:
194+
CheckFeatureFlag(AppData()->FeatureFlags);
195+
break;
196+
197+
default:
198+
break;
199+
}
200+
}
201+
202+
void Handle(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse::TPtr& ev) {
203+
CheckFeatureFlag(ev->Get()->Config->GetFeatureFlags());
204+
}
205+
206+
STRICT_STFUNC(StateFunc,
207+
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
208+
hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
209+
hFunc(TEvents::TEvUndelivered, Handle);
210+
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
211+
)
212+
213+
private:
214+
void GetDatabaseInfo() const {
215+
const auto& externalContext = Context.GetExternalData();
216+
const auto userToken = externalContext.GetUserToken() ? MakeIntrusive<NACLib::TUserToken>(*externalContext.GetUserToken()) : nullptr;
217+
Register(CreateDatabaseFetcherActor(SelfId(), externalContext.GetDatabase(), userToken, NACLib::EAccessRights::GenericFull));
218+
}
219+
220+
void ValidateRanks() {
221+
if (Context.GetActivityType() == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
222+
RanksChecked = true;
223+
TryFinish();
224+
return;
225+
}
226+
227+
std::unordered_map<i64, TString> ranksToNames;
228+
for (const auto& object : PatchedObjects) {
229+
const auto rank = object.GetRank();
230+
if (rank == -1) {
231+
continue;
232+
}
233+
if (!ranksToNames.insert({rank, object.GetName()}).second) {
234+
FailAndPassAway(TStringBuilder() << "Found duplicate rank " << rank);
235+
}
236+
}
237+
238+
Register(new TQueryRetryActor<TRanksCheckerActor, TEvPrivate::TEvRanksCheckerResponse, TString, TString, TString, std::unordered_map<i64, TString>>(
239+
SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.SessionId, AlterContext.TransactionId, ranksToNames
240+
));
241+
}
242+
243+
void CheckFeatureFlag(const NKikimrConfig::TFeatureFlags& featureFlags) {
244+
if (!featureFlags.GetEnableResourcePools()) {
245+
FailAndPassAway("Resource pools classifiers are disabled. Please contact your system administrator to enable it");
246+
return;
247+
}
248+
if (Serverless && !featureFlags.GetEnableResourcePoolsOnServerless()) {
249+
FailAndPassAway("Resource pools classifiers are disabled for serverless domains. Please contact your system administrator to enable it");
250+
return;
251+
}
252+
253+
FeatureFlagChecked = true;
254+
TryFinish();
255+
}
256+
257+
void FailAndPassAway(const TString& message, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
258+
FailAndPassAway(TStringBuilder() << message << ", status: " << status << ", reason: " << issues.ToOneLineString());
259+
}
260+
261+
void FailAndPassAway(const TString& message) {
262+
Controller->OnPreparationProblem(message);
263+
PassAway();
264+
}
265+
266+
void TryFinish() {
267+
if (!FeatureFlagChecked || !RanksChecked) {
268+
return;
269+
}
270+
271+
Controller->OnPreparationFinished(std::move(PatchedObjects));
272+
PassAway();
273+
}
274+
275+
private:
276+
const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& Context;
277+
const NMetadata::NModifications::TAlterOperationContext& AlterContext;
278+
279+
bool Serverless = false;
280+
bool FeatureFlagChecked = false;
281+
bool RanksChecked = false;
282+
283+
NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr Controller;
284+
std::vector<TResourcePoolClassifierConfig> PatchedObjects;
285+
};
286+
287+
} // anonymous namespace
288+
289+
IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext) {
290+
return new TResourcePoolClassifierPreparationActor(std::move(patchedObjects), std::move(controller), context, alterContext);
291+
}
292+
293+
} // namespace NKikimr::NKqp
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include "object.h"
4+
5+
#include <ydb/services/metadata/manager/generic_manager.h>
6+
7+
8+
namespace NKikimr::NKqp {
9+
10+
NActors::IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext);
11+
12+
} // namespace NKikimr::NKqp
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#include "fetcher.h"
2+
3+
4+
namespace NKikimr::NKqp {
5+
6+
std::vector<NMetadata::IClassBehaviour::TPtr> TResourcePoolClassifierSnapshotsFetcher::DoGetManagers() const {
7+
return {TResourcePoolClassifierConfig::GetBehaviour()};
8+
}
9+
10+
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)