Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@

namespace NKikimr::NKqp::NWorkload {

struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
: Database(database)
, PoolId(poolId)
{}

const TString Database;
const TString PoolId;
};

struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: Database(database)
Expand Down Expand Up @@ -80,4 +90,14 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
, Serverless(serverless)
{}

const TString Database;
const bool Serverless;
};

} // NKikimr::NKqp::NWorkload
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
};
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
false, false, std::move(TempTablesState), nullptr, SplitCtx);
false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso

TPoolSettings resourcePoolSettings;
auto& properties = *resourcePoolDescription.MutableProperties()->MutableProperties();
for (const auto& [property, setting] : GetPropertiesMap(resourcePoolSettings, true)) {
for (const auto& [property, setting] : resourcePoolSettings.GetPropertiesMap(true)) {
if (std::optional<TString> value = featuresExtractor.Extract(property)) {
try {
std::visit(TSettingsParser{*value}, setting);
std::visit(TPoolSettings::TParser{*value}, setting);
} catch (...) {
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
}
} else if (!featuresExtractor.ExtractResetFeature(property)) {
continue;
}

TString value = std::visit(TSettingsExtractor(), setting);
const TString value = std::visit(TPoolSettings::TExtractor(), setting);
properties.insert({property, value});
}

Expand Down Expand Up @@ -241,23 +241,23 @@ void TResourcePoolManager::PrepareCreateResourcePool(NKqpProto::TKqpSchemeOperat
}

auto& schemeTx = *schemeOperation.MutableCreateResourcePool();
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".resource_pools/"}));
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".metadata/workload_manager/pools/"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);

FillResourcePoolDescription(*schemeTx.MutableCreateResourcePool(), settings);
}

void TResourcePoolManager::PrepareAlterResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const {
auto& schemeTx = *schemeOperation.MutableAlterResourcePool();
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".resource_pools/"}));
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".metadata/workload_manager/pools/"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterResourcePool);

FillResourcePoolDescription(*schemeTx.MutableCreateResourcePool(), settings);
}

void TResourcePoolManager::PrepareDropResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const {
auto& schemeTx = *schemeOperation.MutableDropResourcePool();
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".resource_pools/"}));
schemeTx.SetWorkingDir(JoinPath({context.GetExternalData().GetDatabase(), ".metadata/workload_manager/pools/"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropResourcePool);

schemeTx.MutableDrop()->SetName(settings.GetObjectId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "behaviour.h"
#include "initializer.h"
#include "manager.h"


namespace NKikimr::NKqp {

TResourcePoolClassifierBehaviour::TFactory::TRegistrator<TResourcePoolClassifierBehaviour> TResourcePoolClassifierBehaviour::Registrator(TResourcePoolClassifierConfig::GetTypeId());

NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolClassifierBehaviour::ConstructInitializer() const {
return std::make_shared<TResourcePoolClassifierInitializer>();
}

NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolClassifierBehaviour::ConstructOperationsManager() const {
return std::make_shared<TResourcePoolClassifierManager>();
}

TString TResourcePoolClassifierBehaviour::GetInternalStorageTablePath() const {
return "workload_manager/classifiers/resource_pool_classifiers";
}

TString TResourcePoolClassifierBehaviour::GetTypeId() const {
return TResourcePoolClassifierConfig::GetTypeId();
}

NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierBehaviour::GetInstance() {
static std::shared_ptr<TResourcePoolClassifierBehaviour> result = std::make_shared<TResourcePoolClassifierBehaviour>();
return result;
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "object.h"

#include <ydb/services/metadata/abstract/initialization.h>
#include <ydb/services/metadata/abstract/kqp_common.h>


namespace NKikimr::NKqp {

class TResourcePoolClassifierBehaviour : public NMetadata::TClassBehaviour<TResourcePoolClassifierConfig> {
static TFactory::TRegistrator<TResourcePoolClassifierBehaviour> Registrator;

protected:
virtual NMetadata::NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
virtual NMetadata::NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
virtual TString GetInternalStorageTablePath() const override;

public:
virtual TString GetTypeId() const override;

static IClassBehaviour::TPtr GetInstance();
};

} // namespace NKikimr::NKqp
Loading