Skip to content

YQ-3492 support resource pool classifiers objects saving #7491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso

TPoolSettings resourcePoolSettings;
auto& properties = *resourcePoolDescription.MutableProperties()->MutableProperties();
for (const auto& [property, setting] : GetPropertiesMap(resourcePoolSettings, true)) {
for (const auto& [property, setting] : resourcePoolSettings.GetPropertiesMap(true)) {
if (std::optional<TString> value = featuresExtractor.Extract(property)) {
try {
std::visit(TSettingsParser{*value}, setting);
std::visit(TPoolSettings::TParser{*value}, setting);
} catch (...) {
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
}
} else if (!featuresExtractor.ExtractResetFeature(property)) {
continue;
}

TString value = std::visit(TSettingsExtractor(), setting);
const TString value = std::visit(TPoolSettings::TExtractor(), setting);
properties.insert({property, value});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "behaviour.h"
#include "initializer.h"
#include "manager.h"


namespace NKikimr::NKqp {

TResourcePoolClassifierBehaviour::TFactory::TRegistrator<TResourcePoolClassifierBehaviour> TResourcePoolClassifierBehaviour::Registrator(TResourcePoolClassifierConfig::GetTypeId());

NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolClassifierBehaviour::ConstructInitializer() const {
return std::make_shared<TResourcePoolClassifierInitializer>();
}

NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolClassifierBehaviour::ConstructOperationsManager() const {
return std::make_shared<TResourcePoolClassifierManager>();
}

TString TResourcePoolClassifierBehaviour::GetInternalStorageTablePath() const {
return "resource_pools/resource_pools_classifiers";
}

TString TResourcePoolClassifierBehaviour::GetTypeId() const {
return TResourcePoolClassifierConfig::GetTypeId();
}

NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierBehaviour::GetInstance() {
static std::shared_ptr<TResourcePoolClassifierBehaviour> result = std::make_shared<TResourcePoolClassifierBehaviour>();
return result;
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "object.h"

#include <ydb/services/metadata/abstract/initialization.h>
#include <ydb/services/metadata/abstract/kqp_common.h>


namespace NKikimr::NKqp {

class TResourcePoolClassifierBehaviour : public NMetadata::TClassBehaviour<TResourcePoolClassifierConfig> {
static TFactory::TRegistrator<TResourcePoolClassifierBehaviour> Registrator;

protected:
virtual NMetadata::NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
virtual NMetadata::NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
virtual TString GetInternalStorageTablePath() const override;

public:
virtual TString GetTypeId() const override;

static IClassBehaviour::TPtr GetInstance();
};

} // namespace NKikimr::NKqp
300 changes: 300 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
#include "checker.h"

#include <ydb/core/base/path.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/kqp/workload_service/actors/actors.h>
#include <ydb/core/kqp/workload_service/common/events.h>
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>

#include <ydb/library/query_actor/query_actor.h>


namespace NKikimr::NKqp {

namespace {

using namespace NActors;
using namespace NResourcePool;
using namespace NWorkload;


class TRanksCheckerActor : public NKikimr::TQueryBase {
using TBase = NKikimr::TQueryBase;

public:
TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
: TBase(NKikimrServices::KQP_GATEWAY, sessionId)
, Database(database)
, RanksToCheck(ranksToCheck)
{
TxId = transactionId;
SetOperationInfo(__func__, Database);
}

void OnRunQuery() override {
const auto& tablePath = TResourcePoolClassifierConfig::GetBehaviour()->GetStorageTablePath();

TStringBuilder sql = TStringBuilder() << R"(
-- TRanksCheckerActor::OnRunQuery
DECLARE $database AS Text;
)";

NYdb::TParamsBuilder params;
params
.AddParam("$database")
.Utf8(CanonizePath(Database))
.Build();

if (!RanksToCheck.empty()) {
sql << R"(
DECLARE $ranks AS List<Int64>;
PRAGMA AnsiInForEmptyOrNullableItemsCollections;

SELECT
rank, name
FROM `)" << tablePath << R"(`
WHERE database = $database
AND rank IN $ranks;
)";

auto& param = params.AddParam("$ranks").BeginList();
for (const auto& [rank, _] : RanksToCheck) {
param.AddListItem().Int64(rank);
}
param.EndList().Build();

ExpectedResultSets++;
}

sql << R"(
SELECT
MAX(rank) AS MaxRank,
COUNT(*) AS NumberClassifiers
FROM `)" << tablePath << R"(`
WHERE database = $database;
)";

RunDataQuery(sql, &params, TTxControl::ContinueTx());
}

void OnQueryResult() override {
if (ResultSets.size() != ExpectedResultSets) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
}

ui64 resultSetId = 0;
if (!RanksToCheck.empty()) {
NYdb::TResultSetParser result(ResultSets[resultSetId++]);
while (result.TryNextRow()) {
TMaybe<i64> rank = result.ColumnParser("rank").GetOptionalInt64();
if (!rank) {
continue;
}

TMaybe<TString> name = result.ColumnParser("name").GetOptionalUtf8();
if (!name) {
continue;
}

if (auto it = RanksToCheck.find(*rank); it != RanksToCheck.end() && it->second != *name) {
Finish(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Classifier with rank " << *rank << " already exists, its name " << *name);
return;
}
}
}

{ // Classifiers stats
NYdb::TResultSetParser result(ResultSets[resultSetId++]);
if (!result.TryNextRow()) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
}

MaxRank = result.ColumnParser("MaxRank").GetOptionalInt64().GetOrElse(0);
NumberClassifiers = result.ColumnParser("NumberClassifiers").GetUint64();
}

Finish();
}

void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
Send(Owner, new TEvPrivate::TEvRanksCheckerResponse(status, MaxRank, NumberClassifiers, std::move(issues)));
}

private:
const TString Database;
const std::unordered_map<i64, TString> RanksToCheck;

ui64 ExpectedResultSets = 1;
i64 MaxRank = 0;
ui64 NumberClassifiers = 0;
};

class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResourcePoolClassifierPreparationActor> {
public:
TResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext)
: Context(context)
, AlterContext(alterContext)
, Controller(std::move(controller))
, PatchedObjects(std::move(patchedObjects))
{}

void Bootstrap() {
Become(&TResourcePoolClassifierPreparationActor::StateFunc);
ValidateRanks();
GetDatabaseInfo();
}

void Handle(TEvPrivate::TEvRanksCheckerResponse::TPtr& ev) {
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
FailAndPassAway("Resource pool classifier rank check failed", ev->Get()->Status, ev->Get()->Issues);
return;
}

if (ev->Get()->NumberClassifiers >= CLASSIFIER_COUNT_LIMIT) {
FailAndPassAway(TStringBuilder() << "Number of resource pool classifiers reached limit in " << CLASSIFIER_COUNT_LIMIT);
return;
}

i64 maxRank = ev->Get()->MaxRank;
for (auto& object : PatchedObjects) {
if (object.GetRank() != -1) {
continue;
}
if (maxRank > std::numeric_limits<i64>::max() - CLASSIFIER_RANK_OFFSET) {
FailAndPassAway(TStringBuilder() << "The rank could not be set automatically, the maximum rank of the resource pool classifier is too high: " << ev->Get()->MaxRank);
return;
}

maxRank += CLASSIFIER_RANK_OFFSET;
object.SetRank(maxRank);
}

RanksChecked = true;
TryFinish();
}

void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues);
return;
}

Serverless = ev->Get()->Serverless;

Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvGetConfigRequest(
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
), IEventHandle::FlagTrackDelivery);
}

void Handle(TEvents::TEvUndelivered::TPtr& ev) {
switch (ev->Get()->SourceType) {
case NConsole::TEvConfigsDispatcher::EvGetConfigRequest:
CheckFeatureFlag(AppData()->FeatureFlags);
break;

default:
break;
}
}

void Handle(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse::TPtr& ev) {
CheckFeatureFlag(ev->Get()->Config->GetFeatureFlags());
}

STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
)

private:
void GetDatabaseInfo() const {
const auto& externalContext = Context.GetExternalData();
const auto userToken = externalContext.GetUserToken() ? MakeIntrusive<NACLib::TUserToken>(*externalContext.GetUserToken()) : nullptr;
Register(CreateDatabaseFetcherActor(SelfId(), externalContext.GetDatabase(), userToken, NACLib::EAccessRights::GenericFull));
}

void ValidateRanks() {
if (Context.GetActivityType() == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
RanksChecked = true;
TryFinish();
return;
}

std::unordered_map<i64, TString> ranksToNames;
for (const auto& object : PatchedObjects) {
const auto rank = object.GetRank();
if (rank == -1) {
continue;
}
if (!ranksToNames.insert({rank, object.GetName()}).second) {
FailAndPassAway(TStringBuilder() << "Found duplicate rank " << rank);
}
}

Register(new TQueryRetryActor<TRanksCheckerActor, TEvPrivate::TEvRanksCheckerResponse, TString, TString, TString, std::unordered_map<i64, TString>>(
SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.SessionId, AlterContext.TransactionId, ranksToNames
));
}

void CheckFeatureFlag(const NKikimrConfig::TFeatureFlags& featureFlags) {
if (Context.GetActivityType() == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
FeatureFlagChecked = true;
TryFinish();
return;
}

if (!featureFlags.GetEnableResourcePools()) {
FailAndPassAway("Resource pool classifiers are disabled. Please contact your system administrator to enable it");
return;
}
if (Serverless && !featureFlags.GetEnableResourcePoolsOnServerless()) {
FailAndPassAway("Resource pool classifiers are disabled for serverless domains. Please contact your system administrator to enable it");
return;
}

FeatureFlagChecked = true;
TryFinish();
}

void FailAndPassAway(const TString& message, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
FailAndPassAway(TStringBuilder() << message << ", status: " << status << ", reason: " << issues.ToOneLineString());
}

void FailAndPassAway(const TString& message) {
Controller->OnPreparationProblem(message);
PassAway();
}

void TryFinish() {
if (!FeatureFlagChecked || !RanksChecked) {
return;
}

Controller->OnPreparationFinished(std::move(PatchedObjects));
PassAway();
}

private:
const NMetadata::NModifications::IOperationsManager::TInternalModificationContext Context;
const NMetadata::NModifications::TAlterOperationContext AlterContext;

bool Serverless = false;
bool FeatureFlagChecked = false;
bool RanksChecked = false;

NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr Controller;
std::vector<TResourcePoolClassifierConfig> PatchedObjects;
};

} // anonymous namespace

IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext) {
return new TResourcePoolClassifierPreparationActor(std::move(patchedObjects), std::move(controller), context, alterContext);
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include "object.h"

#include <ydb/services/metadata/manager/generic_manager.h>


namespace NKikimr::NKqp {

NActors::IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext);

} // namespace NKikimr::NKqp
Loading
Loading