Skip to content

Commit a9f89f4

Browse files
authored
Merge 428a2a8 into f9ed913
2 parents f9ed913 + 428a2a8 commit a9f89f4

15 files changed

+67
-25
lines changed

ydb/core/kqp/common/events/workload_service.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
4+
#include <ydb/core/resource_pools/resource_pool_settings.h>
45

56
#include <ydb/library/aclib/aclib.h>
67
#include <ydb/library/actors/core/event_local.h>
@@ -24,12 +25,14 @@ struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestInto
2425
};
2526

2627
struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqpWorkloadServiceEvents::EvContinueRequest> {
27-
explicit TEvContinueRequest(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
28+
TEvContinueRequest(Ydb::StatusIds::StatusCode status, const NResourcePool::TPoolSettings& poolConfig, NYql::TIssues issues = {})
2829
: Status(status)
30+
, PoolConfig(poolConfig)
2931
, Issues(std::move(issues))
3032
{}
3133

3234
const Ydb::StatusIds::StatusCode Status;
35+
const NResourcePool::TPoolSettings PoolConfig;
3336
const NYql::TIssues Issues;
3437
};
3538

ydb/core/kqp/common/events/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ PEERDIR(
1515
ydb/core/grpc_services/cancelation
1616
ydb/core/kqp/common/shutdown
1717
ydb/core/kqp/common/compilation
18+
ydb/core/resource_pools
1819

1920
ydb/library/yql/dq/actors
2021
ydb/public/api/protos

ydb/core/kqp/common/kqp_user_request_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include <util/generic/fwd.h>
55
#include <contrib/libs/protobuf/src/google/protobuf/map.h>
66

7+
#include <ydb/core/resource_pools/resource_pool_settings.h>
8+
79
namespace NKikimr::NKqp {
810

911
struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
@@ -13,6 +15,7 @@ namespace NKikimr::NKqp {
1315
TString CurrentExecutionId;
1416
TString CustomerSuppliedId;
1517
TString PoolId;
18+
NResourcePool::TPoolSettings PoolConfig;
1619

1720
TUserRequestContext() = default;
1821

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
481481
}
482482

483483
LOG_D("continue request, pool id: " << poolId);
484+
QueryState->UserRequestContext->PoolConfig = ev->Get()->PoolConfig;
484485
CompileQuery();
485486
}
486487

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
33
#include <ydb/core/kqp/ut/common/columnshard.h>
44
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
5+
#include <ydb/core/resource_pools/resource_pool_settings.h>
56
#include <ydb/core/testlib/common_helper.h>
67
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
78
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
@@ -238,7 +239,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
238239

239240
Y_UNIT_TEST(ExecuteQueryWithWorkloadManager) {
240241
NWorkload::TWorkloadManagerConfig workloadManagerConfig;
241-
workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()});
242+
workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()});
242243
SetWorkloadManagerConfig(workloadManagerConfig);
243244

244245
NKikimrConfig::TAppConfig config;

ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/kqp/counters/kqp_counters.h>
22
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
33
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
4+
#include <ydb/core/resource_pools/resource_pool_settings.h>
45
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
56
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
67
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -101,7 +102,7 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
101102

102103
Y_UNIT_TEST(ExecuteScriptWithWorkloadManager) {
103104
NWorkload::TWorkloadManagerConfig workloadManagerConfig;
104-
workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()});
105+
workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()});
105106
SetWorkloadManagerConfig(workloadManagerConfig);
106107

107108
NKikimrConfig::TAppConfig config;

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
258258

259259
void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
260260
LOG_W("Reply continue error " << status << " to " << replyActorId << ": " << message);
261-
Send(replyActorId, new TEvContinueRequest(status, {NYql::TIssue(message)}));
261+
Send(replyActorId, new TEvContinueRequest(status, {}, {NYql::TIssue(message)}));
262262
}
263263

264264
void ReplyCleanupError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {

ydb/core/kqp/workload_service/kqp_workload_service.h

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/core/resource_pools/resource_pool_settings.h>
34
#include <ydb/library/actors/core/actor.h>
45

56

@@ -8,15 +9,7 @@ namespace NKikimr::NKqp {
89
namespace NWorkload {
910

1011
struct TWorkloadManagerConfig {
11-
struct TPoolConfig {
12-
ui64 ConcurrentQueryLimit = 0; // 0 = infinity
13-
ui64 QueryCountLimit = 0; // 0 = infinity
14-
TDuration QueryCancelAfter = TDuration::Days(1);
15-
16-
TString ACL = ""; // empty = full access for all users
17-
};
18-
19-
std::unordered_map<TString, TPoolConfig> Pools;
12+
std::unordered_map<TString, NResourcePool::TPoolSettings> Pools;
2013
};
2114

2215
void SetWorkloadManagerConfig(const TWorkloadManagerConfig& workloadManagerConfig);

ydb/core/kqp/workload_service/kqp_workload_service_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#pragma once
22

3-
#include "kqp_workload_service.h"
43
#include "kqp_workload_service_tables_impl.h"
54

65
#include <ydb/core/kqp/common/events/events.h>
6+
#include <ydb/core/resource_pools/resource_pool_settings.h>
77

88
#include <ydb/library/aclib/aclib.h>
99
#include <ydb/library/actors/core/actor.h>
@@ -36,7 +36,7 @@ class IState : public TThrRefBase {
3636

3737
using TStatePtr = TIntrusivePtr<IState>;
3838

39-
TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters);
39+
TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);
4040

4141
} // NQueue
4242

ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ class TStateBase : public IState {
7474
};
7575

7676
public:
77-
TStateBase(const TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters)
77+
TStateBase(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters)
7878
: Counters(counters)
79+
, PoolConfig(poolConfig)
7980
, ActorContext(actorContext)
8081
, PoolId(poolId)
8182
, CancelAfter(poolConfig.QueryCancelAfter)
@@ -96,7 +97,7 @@ class TStateBase : public IState {
9697

9798
bool PlaceRequest(const TActorId& workerActorId, const TString& sessionId) final {
9899
if (LocalSessions.contains(sessionId)) {
99-
ActorContext.Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)}));
100+
ActorContext.Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)}));
100101
return false;
101102
}
102103

@@ -143,7 +144,7 @@ class TStateBase : public IState {
143144
}
144145

145146
void ReplyContinue(TRequest* request, Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, NYql::TIssues issues = {}) {
146-
ActorContext.Send(request->WorkerActorId, new TEvContinueRequest(status, std::move(issues)));
147+
ActorContext.Send(request->WorkerActorId, new TEvContinueRequest(status, PoolConfig, std::move(issues)));
147148

148149
if (status == Ydb::StatusIds::SUCCESS) {
149150
LocalInFlight++;
@@ -250,12 +251,12 @@ class TStateBase : public IState {
250251
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
251252
}
252253

253-
static ui64 GetMaxPoolSize(const TWorkloadManagerConfig::TPoolConfig& poolConfig) {
254+
static ui64 GetMaxPoolSize(const NResourcePool::TPoolSettings& poolConfig) {
254255
const ui64 queryCountLimit = poolConfig.QueryCountLimit;
255256
return queryCountLimit ? queryCountLimit : std::numeric_limits<ui64>::max();
256257
}
257258

258-
static ui64 GetMaxInFlight(const TWorkloadManagerConfig::TPoolConfig& poolConfig) {
259+
static ui64 GetMaxInFlight(const NResourcePool::TPoolSettings& poolConfig) {
259260
const ui64 queueSizeLimit = GetMaxPoolSize(poolConfig);
260261
const ui64 concurrentQueryLimit = poolConfig.ConcurrentQueryLimit;
261262
return std::min(concurrentQueryLimit ? concurrentQueryLimit : std::numeric_limits<ui64>::max(), queueSizeLimit);
@@ -277,6 +278,7 @@ class TStateBase : public IState {
277278
protected:
278279
NMonitoring::TDynamicCounterPtr Counters;
279280

281+
const NResourcePool::TPoolSettings PoolConfig;
280282
const TActorContext ActorContext;
281283
const TString PoolId;
282284
const TDuration CancelAfter;
@@ -306,7 +308,7 @@ class TUnlimitedState : public TStateBase {
306308
using TBase = TStateBase;
307309

308310
public:
309-
TUnlimitedState(const TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters)
311+
TUnlimitedState(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters)
310312
: TBase(actorContext, poolId, poolConfig, counters)
311313
{
312314
Y_ENSURE(InFlightLimit == std::numeric_limits<ui64>::max());
@@ -341,7 +343,7 @@ class TFifoState : public TStateBase {
341343
static constexpr ui64 MAX_PENDING_REQUESTS = 1000;
342344

343345
public:
344-
TFifoState(TActorContext actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters)
346+
TFifoState(TActorContext actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters)
345347
: TBase(actorContext, poolId, poolConfig, counters)
346348
{
347349
Y_ENSURE(InFlightLimit < std::numeric_limits<ui64>::max());
@@ -667,7 +669,7 @@ class TFifoState : public TStateBase {
667669

668670
} // anonymous namespace
669671

670-
TStatePtr CreateState(const TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters) {
672+
TStatePtr CreateState(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) {
671673
if (!poolConfig.ConcurrentQueryLimit && !poolConfig.QueryCountLimit) {
672674
return MakeIntrusive<TUnlimitedState>(actorContext, poolId, poolConfig, counters);
673675
}

0 commit comments

Comments
 (0)