Skip to content

Commit 3fbd1cc

Browse files
authored
YQ-3333 added yql syntax for creating resource pools (#5498)
1 parent 0dc827a commit 3fbd1cc

File tree

20 files changed

+553
-5
lines changed

20 files changed

+553
-5
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#include "behaviour.h"
2+
#include "manager.h"
3+
4+
#include <ydb/services/metadata/abstract/initialization.h>
5+
6+
7+
namespace NKikimr::NKqp {
8+
9+
TResourcePoolBehaviour::TFactory::TRegistrator<TResourcePoolBehaviour> TResourcePoolBehaviour::Registrator(TResourcePoolConfig::GetTypeId());
10+
11+
NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolBehaviour::ConstructInitializer() const {
12+
return nullptr;
13+
}
14+
15+
NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolBehaviour::ConstructOperationsManager() const {
16+
return std::make_shared<TResourcePoolManager>();
17+
}
18+
19+
TString TResourcePoolBehaviour::GetInternalStorageTablePath() const {
20+
return TResourcePoolConfig::GetTypeId();
21+
}
22+
23+
24+
TString TResourcePoolBehaviour::GetTypeId() const {
25+
return TResourcePoolConfig::GetTypeId();
26+
}
27+
28+
} // namespace NKikimr::NKqp
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#pragma once
2+
3+
#include <ydb/services/metadata/abstract/kqp_common.h>
4+
5+
6+
namespace NKikimr::NKqp {
7+
8+
class TResourcePoolConfig {
9+
public:
10+
static TString GetTypeId() {
11+
return "RESOURCE_POOL";
12+
}
13+
};
14+
15+
class TResourcePoolBehaviour: public NMetadata::TClassBehaviour<TResourcePoolConfig> {
16+
static TFactory::TRegistrator<TResourcePoolBehaviour> Registrator;
17+
18+
protected:
19+
virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override;
20+
virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override;
21+
virtual TString GetInternalStorageTablePath() const override;
22+
23+
public:
24+
virtual TString GetTypeId() const override;
25+
};
26+
27+
} // namespace NKikimr::NKqp
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#include "manager.h"
2+
3+
#include <ydb/core/base/appdata_fwd.h>
4+
#include <ydb/core/base/feature_flags.h>
5+
6+
7+
namespace NKikimr::NKqp {
8+
9+
namespace {
10+
11+
void CheckFeatureFlag(TResourcePoolManager::TInternalModificationContext& context) {
12+
auto* actorSystem = context.GetExternalData().GetActorSystem();
13+
if (!actorSystem) {
14+
ythrow yexception() << "This place needs an actor system. Please contact internal support";
15+
}
16+
17+
if (!AppData(actorSystem)->FeatureFlags.GetEnableResourcePools()) {
18+
throw std::runtime_error("Resource pools are disabled. Please contact your system administrator to enable it");
19+
}
20+
}
21+
22+
void ValidateObjectId(const TString& objectId) {
23+
if (objectId.find('/') != TString::npos) {
24+
throw std::runtime_error("Resource pool id should not contain '/' symbol");
25+
}
26+
}
27+
28+
TResourcePoolManager::TYqlConclusionStatus StatusFromActivityType(TResourcePoolManager::EActivityType activityType) {
29+
using TYqlConclusionStatus = TResourcePoolManager::TYqlConclusionStatus;
30+
using EActivityType = TResourcePoolManager::EActivityType;
31+
32+
switch (activityType) {
33+
case EActivityType::Undefined:
34+
return TYqlConclusionStatus::Fail("Undefined operation for RESOURCE_POOL object");
35+
case EActivityType::Upsert:
36+
return TYqlConclusionStatus::Fail("Upsert operation for RESOURCE_POOL objects is not implemented");
37+
case EActivityType::Create:
38+
return TYqlConclusionStatus::Fail("Create operation for RESOURCE_POOL objects is not implemented");
39+
case EActivityType::Alter:
40+
return TYqlConclusionStatus::Fail("Alter operation for RESOURCE_POOL objects is not implemented");
41+
case EActivityType::Drop:
42+
return TYqlConclusionStatus::Fail("Drop operation for RESOURCE_POOL objects is not implemented");
43+
}
44+
}
45+
46+
} // anonymous namespace
47+
48+
NThreading::TFuture<TResourcePoolManager::TYqlConclusionStatus> TResourcePoolManager::DoModify(const NYql::TObjectSettingsImpl& settings, ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const {
49+
Y_UNUSED(nodeId, manager);
50+
51+
try {
52+
CheckFeatureFlag(context);
53+
ValidateObjectId(settings.GetObjectId());
54+
55+
return NThreading::MakeFuture<TYqlConclusionStatus>(StatusFromActivityType(context.GetActivityType()));
56+
} catch (...) {
57+
return NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Fail(CurrentExceptionMessage()));
58+
}
59+
}
60+
61+
TResourcePoolManager::TYqlConclusionStatus TResourcePoolManager::DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const {
62+
Y_UNUSED(schemeOperation, manager);
63+
64+
try {
65+
CheckFeatureFlag(context);
66+
ValidateObjectId(settings.GetObjectId());
67+
68+
return StatusFromActivityType(context.GetActivityType());
69+
} catch (...) {
70+
return TYqlConclusionStatus::Fail(CurrentExceptionMessage());
71+
}
72+
}
73+
74+
NThreading::TFuture<TResourcePoolManager::TYqlConclusionStatus> TResourcePoolManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const {
75+
Y_UNUSED(nodeId, manager, context);
76+
77+
return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast<i32>(schemeOperation.GetOperationCase())));
78+
}
79+
80+
} // namespace NKikimr::NKqp
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <ydb/services/metadata/manager/abstract.h>
4+
5+
6+
namespace NKikimr::NKqp {
7+
8+
class TResourcePoolManager : public NMetadata::NModifications::IOperationsManager {
9+
public:
10+
using NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus;
11+
12+
protected:
13+
NThreading::TFuture<TYqlConclusionStatus> DoModify(const NYql::TObjectSettingsImpl& settings, ui32 nodeId,
14+
const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const override;
15+
16+
TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
17+
const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override;
18+
19+
public:
20+
21+
NThreading::TFuture<TYqlConclusionStatus> ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
22+
const ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;
23+
};
24+
25+
} // namespace NKikimr::NKqp
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
manager.cpp
5+
GLOBAL behaviour.cpp
6+
)
7+
8+
PEERDIR(
9+
ydb/services/metadata/abstract
10+
ydb/services/metadata/manager
11+
)
12+
13+
YQL_LAST_ABI_VERSION()
14+
15+
END()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
RECURSE(
22
external_data_source
3+
resource_pool
34
table
45
tablestore
56
)

ydb/core/kqp/gateway/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ PEERDIR(
1717
ydb/core/kqp/gateway/behaviour/tablestore
1818
ydb/core/kqp/gateway/behaviour/table
1919
ydb/core/kqp/gateway/behaviour/external_data_source
20+
ydb/core/kqp/gateway/behaviour/resource_pool
2021
ydb/core/kqp/gateway/behaviour/view
2122
ydb/core/kqp/gateway/utils
2223
ydb/library/yql/providers/result/expr_nodes

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5988,6 +5988,51 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
59885988
UNIT_ASSERT_EQUAL_C(describe.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
59895989
}
59905990
}
5991+
5992+
Y_UNIT_TEST(DisableResourcePools) {
5993+
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(false));
5994+
auto db = kikimr.GetTableClient();
5995+
auto session = db.CreateSession().GetValueSync().GetSession();
5996+
5997+
auto checkDisabled = [&session](const TString& query) {
5998+
Cerr << "Check query:\n" << query << "\n";
5999+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
6000+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
6001+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it");
6002+
};
6003+
6004+
// CREATE RESOURCE POOL
6005+
checkDisabled(R"(
6006+
CREATE RESOURCE POOL `MyResourcePool` WITH (
6007+
CONCURRENT_QUERY_LIMIT=20,
6008+
QUERY_CANCEL_AFTER_SECONDS=86400,
6009+
QUERY_COUNT_LIMIT=1000
6010+
);)");
6011+
6012+
// ALTER RESOURCE POOL
6013+
checkDisabled(R"(
6014+
ALTER RESOURCE POOL `MyResourcePool`
6015+
SET (CONCURRENT_QUERY_LIMIT = 30),
6016+
SET QUERY_COUNT_LIMIT 100,
6017+
RESET (QUERY_CANCEL_AFTER_SECONDS);
6018+
)");
6019+
6020+
// DROP RESOURCE POOL
6021+
checkDisabled("DROP RESOURCE POOL `MyResourcePool`;");
6022+
}
6023+
6024+
Y_UNIT_TEST(ResourcePoolsValidation) {
6025+
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(true));
6026+
auto db = kikimr.GetTableClient();
6027+
auto session = db.CreateSession().GetValueSync().GetSession();
6028+
6029+
auto result = session.ExecuteSchemeQuery(R"(
6030+
CREATE RESOURCE POOL `MyFolder/MyResourcePool` WITH (
6031+
CONCURRENT_QUERY_LIMIT=20
6032+
);)").GetValueSync();
6033+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
6034+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pool id should not contain '/' symbol");
6035+
}
59916036
}
59926037

59936038
Y_UNIT_TEST_SUITE(KqpOlapScheme) {

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,5 @@ message TFeatureFlags {
141141
optional bool EnableExternalSourceSchemaInference = 126 [default = false];
142142
optional bool EnableDbMetadataCache = 127 [default = false];
143143
optional bool EnableTableDatetime64 = 128 [default = false];
144+
optional bool EnableResourcePools = 129 [default = false];
144145
}

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class TTestFeatureFlagsHolder {
5959
FEATURE_FLAG_SETTER(EnableReplaceIfExistsForExternalEntities)
6060
FEATURE_FLAG_SETTER(EnableCMSRequestPriorities)
6161
FEATURE_FLAG_SETTER(EnableTableDatetime64)
62+
FEATURE_FLAG_SETTER(EnableResourcePools)
6263

6364
#undef FEATURE_FLAG_SETTER
6465
};

0 commit comments

Comments
 (0)