Skip to content

YQ-3471 support resource pool classifiers #7812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@

namespace NKikimr::NKqp::NWorkload {

struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
: Database(database)
, PoolId(poolId)
{}

const TString Database;
const TString PoolId;
};

struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: Database(database)
Expand Down Expand Up @@ -80,4 +90,14 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
, Serverless(serverless)
{}

const TString Database;
const bool Serverless;
};

} // NKikimr::NKqp::NWorkload
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
};
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
false, false, std::move(TempTablesState), nullptr, SplitCtx);
false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso

TPoolSettings resourcePoolSettings;
auto& properties = *resourcePoolDescription.MutableProperties()->MutableProperties();
for (const auto& [property, setting] : GetPropertiesMap(resourcePoolSettings, true)) {
for (const auto& [property, setting] : resourcePoolSettings.GetPropertiesMap(true)) {
if (std::optional<TString> value = featuresExtractor.Extract(property)) {
try {
std::visit(TSettingsParser{*value}, setting);
std::visit(TPoolSettings::TParser{*value}, setting);
} catch (...) {
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
}
} else if (!featuresExtractor.ExtractResetFeature(property)) {
continue;
}

TString value = std::visit(TSettingsExtractor(), setting);
const TString value = std::visit(TPoolSettings::TExtractor(), setting);
properties.insert({property, value});
}

Expand Down Expand Up @@ -241,23 +241,23 @@ void TResourcePoolManager::PrepareCreateResourcePool(NKqpProto::TKqpSchemeOperat
}

auto& schemeTx = *schemeOperation.MutableCreateResourcePool();
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".resource_pools/"}));
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".metadata/workload_manager/pools/"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);

FillResourcePoolDescription(*schemeTx.MutableCreateResourcePool(), settings);
}

void TResourcePoolManager::PrepareAlterResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const {
auto& schemeTx = *schemeOperation.MutableAlterResourcePool();
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".resource_pools/"}));
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".metadata/workload_manager/pools/"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterResourcePool);

FillResourcePoolDescription(*schemeTx.MutableCreateResourcePool(), settings);
}

void TResourcePoolManager::PrepareDropResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const {
auto& schemeTx = *schemeOperation.MutableDropResourcePool();
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".resource_pools/"}));
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".metadata/workload_manager/pools/"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropResourcePool);

schemeTx.MutableDrop()->SetName(settings.GetObjectId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "behaviour.h"
#include "initializer.h"
#include "manager.h"


namespace NKikimr::NKqp {

TResourcePoolClassifierBehaviour::TFactory::TRegistrator<TResourcePoolClassifierBehaviour> TResourcePoolClassifierBehaviour::Registrator(TResourcePoolClassifierConfig::GetTypeId());

NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolClassifierBehaviour::ConstructInitializer() const {
return std::make_shared<TResourcePoolClassifierInitializer>();
}

NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolClassifierBehaviour::ConstructOperationsManager() const {
return std::make_shared<TResourcePoolClassifierManager>();
}

TString TResourcePoolClassifierBehaviour::GetInternalStorageTablePath() const {
return "workload_manager/classifiers/resource_pool_classifiers";
}

TString TResourcePoolClassifierBehaviour::GetTypeId() const {
return TResourcePoolClassifierConfig::GetTypeId();
}

NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierBehaviour::GetInstance() {
static std::shared_ptr<TResourcePoolClassifierBehaviour> result = std::make_shared<TResourcePoolClassifierBehaviour>();
return result;
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "object.h"

#include <ydb/services/metadata/abstract/initialization.h>
#include <ydb/services/metadata/abstract/kqp_common.h>


namespace NKikimr::NKqp {

class TResourcePoolClassifierBehaviour : public NMetadata::TClassBehaviour<TResourcePoolClassifierConfig> {
static TFactory::TRegistrator<TResourcePoolClassifierBehaviour> Registrator;

protected:
virtual NMetadata::NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
virtual NMetadata::NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
virtual TString GetInternalStorageTablePath() const override;

public:
virtual TString GetTypeId() const override;

static IClassBehaviour::TPtr GetInstance();
};

} // namespace NKikimr::NKqp
Loading
Loading