Skip to content

Commit 43937fb

Browse files
authored
Merge 16ceb2b into a169e9b
2 parents a169e9b + 16ceb2b commit 43937fb

File tree

11 files changed

+167
-17
lines changed

11 files changed

+167
-17
lines changed

ydb/core/kqp/common/events/query.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#pragma once
2+
#include <ydb/core/resource_pools/resource_pool_settings.h>
23
#include <ydb/core/protos/kqp.pb.h>
34
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
45
#include <ydb/core/kqp/common/kqp_user_request_context.h>
@@ -342,6 +343,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
342343
return Record.GetRequest().GetPoolId();
343344
}
344345

346+
void SetPoolConfig(const NResourcePool::TPoolSettings& config) {
347+
PoolConfig = config;
348+
}
349+
350+
std::optional<NResourcePool::TPoolSettings> GetPoolConfig() const {
351+
return PoolConfig;
352+
}
353+
345354
mutable NKikimrKqp::TEvQueryRequest Record;
346355

347356
private:
@@ -370,6 +379,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
370379
TDuration CancelAfter;
371380
TIntrusivePtr<TUserRequestContext> UserRequestContext;
372381
TDuration ProgressStatsPeriod;
382+
std::optional<NResourcePool::TPoolSettings> PoolConfig;
373383
};
374384

375385
struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,

ydb/core/kqp/common/events/workload_service.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestInto
2424
const TString SessionId;
2525
TString PoolId; // Can be changed to default pool id
2626
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
27+
bool FromCache = false;
2728
};
2829

2930
struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqpWorkloadServiceEvents::EvContinueRequest> {
@@ -66,4 +67,18 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
6667
const NYql::TIssues Issues;
6768
};
6869

70+
struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
71+
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
72+
: Database(database)
73+
, PoolId(poolId)
74+
, Config(config)
75+
, SecurityObject(securityObject)
76+
{}
77+
78+
const TString Database;
79+
const TString PoolId;
80+
const std::optional<NResourcePool::TPoolSettings> Config;
81+
const std::optional<NACLib::TSecurityObject> SecurityObject;
82+
};
83+
6984
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/common/kqp_user_request_context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace NKikimr::NKqp {
1515
TString CurrentExecutionId;
1616
TString CustomerSuppliedId;
1717
TString PoolId;
18-
NResourcePool::TPoolSettings PoolConfig;
18+
std::optional<NResourcePool::TPoolSettings> PoolConfig;
1919

2020
TUserRequestContext() = default;
2121

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ struct TKqpWorkloadServiceEvents {
174174
EvContinueRequest,
175175
EvCleanupRequest,
176176
EvCleanupResponse,
177+
EvUpdatePoolInfo,
177178
};
178179
};
179180

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <ydb/core/cms/console/console.h>
1212
#include <ydb/core/kqp/counters/kqp_counters.h>
1313
#include <ydb/core/kqp/common/events/script_executions.h>
14+
#include <ydb/core/kqp/common/events/workload_service.h>
1415
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
1516
#include <ydb/core/kqp/common/kqp_timeouts.h>
1617
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
@@ -696,11 +697,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
696697
LocalSessions->AttachQueryText(sessionInfo, ev->Get()->GetQuery());
697698
}
698699

699-
if (!FeatureFlags.GetEnableResourcePools()) {
700-
ev->Get()->SetPoolId("");
701-
} else if (!ev->Get()->GetPoolId()) {
702-
// TODO: do not use default pool if there is no limits
703-
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
700+
if (!TryGetPoolInfoFromCache(ev, requestId)) {
701+
return;
704702
}
705703

706704
TActorId targetId;
@@ -1352,6 +1350,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
13521350
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
13531351
hFunc(TEvKqp::TEvListSessionsRequest, Handle);
13541352
hFunc(TEvKqp::TEvListProxyNodesRequest, Handle);
1353+
hFunc(NWorkload::TEvUpdatePoolInfo, Handle);
13551354
default:
13561355
Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
13571356
ev->GetTypeRewrite(), ev->ToString().data());
@@ -1559,6 +1558,43 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
15591558
}
15601559
}
15611560

1561+
bool TryGetPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) {
1562+
if (!FeatureFlags.GetEnableResourcePools()) {
1563+
ev->Get()->SetPoolId("");
1564+
return true;
1565+
}
1566+
1567+
if (!ev->Get()->GetPoolId()) {
1568+
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
1569+
}
1570+
1571+
const auto& poolId = ev->Get()->GetPoolId();
1572+
const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(ev->Get()->GetDatabase(), poolId);
1573+
if (!poolInfo) {
1574+
return true;
1575+
}
1576+
1577+
const auto& securityObject = poolInfo->SecurityObject;
1578+
const auto& userToken = ev->Get()->GetUserToken();
1579+
if (securityObject && userToken && !userToken->GetSerializedToken().empty()) {
1580+
if (!securityObject->CheckAccess(NACLib::EAccessRights::DescribeSchema, *userToken)) {
1581+
ReplyProcessError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions", requestId);
1582+
return false;
1583+
}
1584+
if (!securityObject->CheckAccess(NACLib::EAccessRights::SelectRow, *userToken)) {
1585+
ReplyProcessError(Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You don't have access permissions for resource pool " << poolId, requestId);
1586+
return false;
1587+
}
1588+
}
1589+
1590+
const auto& poolConfig = poolInfo->Config;
1591+
if (poolConfig.ConcurrentQueryLimit == -1 && poolConfig.DatabaseLoadCpuThreshold < 0.0 && !poolConfig.QueryCancelAfter) {
1592+
ev->Get()->SetPoolConfig(poolConfig);
1593+
}
1594+
1595+
return true;
1596+
}
1597+
15621598
void UpdateYqlLogLevels() {
15631599
const auto& kqpYqlName = NKikimrServices::EServiceKikimr_Name(NKikimrServices::KQP_YQL);
15641600
for (auto &entry : LogConfig.GetEntry()) {
@@ -1744,6 +1780,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
17441780
Send(ev->Sender, result.release(), 0, ev->Cookie);
17451781
}
17461782

1783+
void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) {
1784+
ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject);
1785+
}
1786+
17471787
private:
17481788
NKikimrConfig::TLogConfig LogConfig;
17491789
NKikimrConfig::TTableServiceConfig TableServiceConfig;
@@ -1805,6 +1845,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
18051845
std::deque<TDelayedEvent> DelayedEventsQueue;
18061846
bool IsLookupByRmScheduled = false;
18071847
TActorId KqpTempTablesAgentActor;
1848+
1849+
TResourcePoolsCache ResourcePoolsCache;
18081850
};
18091851

18101852
} // namespace

ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/core/base/appdata.h>
4+
#include <ydb/core/base/path.h>
45
#include <ydb/core/kqp/common/kqp.h>
56
#include <ydb/core/kqp/counters/kqp_counters.h>
67
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
@@ -415,4 +416,40 @@ class TLocalSessionsRegistry {
415416
}
416417
};
417418

419+
class TResourcePoolsCache {
420+
struct TPoolInfo {
421+
NResourcePool::TPoolSettings Config;
422+
std::optional<NACLib::TSecurityObject> SecurityObject;
423+
};
424+
425+
public:
426+
std::optional<TPoolInfo> GetPoolInfo(const TString& database, const TString& poolId) const {
427+
auto it = PoolsCache.find(GetPoolKey(database, poolId));
428+
if (it == PoolsCache.end()) {
429+
return std::nullopt;
430+
}
431+
return it->second;
432+
}
433+
434+
void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject) {
435+
const TString& poolKey = GetPoolKey(database, poolId);
436+
if (!config) {
437+
PoolsCache.erase(poolKey);
438+
return;
439+
}
440+
441+
auto& poolInfo = PoolsCache[poolKey];
442+
poolInfo.Config = *config;
443+
poolInfo.SecurityObject = securityObject;
444+
}
445+
446+
private:
447+
static TString GetPoolKey(const TString& database, const TString& poolId) {
448+
return CanonizePath(TStringBuilder() << database << "/" << poolId);
449+
}
450+
451+
private:
452+
std::unordered_map<TString, TPoolInfo> PoolsCache;
453+
};
454+
418455
} // namespace NKikimr::NKqp

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TKqpQueryState : public TNonCopyable {
8585
UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId);
8686
}
8787
UserRequestContext->PoolId = RequestEv->GetPoolId();
88+
UserRequestContext->PoolConfig = RequestEv->GetPoolConfig();
8889
}
8990

9091
// the monotonously growing counter, the ordinal number of the query,

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,20 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
242242
}
243243

244244
void PassRequestToResourcePool() {
245-
Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvPlaceRequestIntoPool(
246-
QueryState->Database,
247-
SessionId,
248-
QueryState->UserRequestContext->PoolId,
249-
QueryState->UserToken
250-
), IEventHandle::FlagTrackDelivery);
245+
const TString& poolId = QueryState->UserRequestContext->PoolId;
246+
auto event = std::make_unique<NWorkload::TEvPlaceRequestIntoPool>(QueryState->Database, SessionId, poolId, QueryState->UserToken);
251247

252-
Become(&TKqpSessionActor::ExecuteState);
248+
TEventFlags flags = 0;
249+
if (QueryState->UserRequestContext->PoolConfig) {
250+
LOG_D("request placed into pool from cache: " << poolId);
251+
event->FromCache = true;
252+
CompileQuery();
253+
} else {
254+
flags |= IEventHandle::FlagTrackDelivery;
255+
Become(&TKqpSessionActor::ExecuteState);
256+
}
257+
258+
Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), event.release(), flags);
253259
}
254260

255261
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
141141

142142
// Schemeboard events
143143
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
144-
IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted);
144+
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
145145
IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable);
146146
)
147147

@@ -150,6 +150,8 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
150150
this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0));
151151
}
152152

153+
SendPoolInfoUpdate(std::nullopt, std::nullopt);
154+
153155
Counters.OnCleanup();
154156

155157
TBase::PassAway();
@@ -254,6 +256,24 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
254256
NResourcePool::TPoolSettings poolConfig;
255257
ParsePoolSettings(result->GetPathDescription().GetResourcePoolDescription(), poolConfig);
256258
UpdatePoolConfig(poolConfig);
259+
260+
const auto& pathDescription = result->GetPathDescription().GetSelf();
261+
NACLib::TSecurityObject object(pathDescription.GetOwner(), false);
262+
if (object.MutableACL()->ParseFromString(pathDescription.GetEffectiveACL())) {
263+
SendPoolInfoUpdate(poolConfig, object);
264+
} else {
265+
SendPoolInfoUpdate(poolConfig, std::nullopt);
266+
}
267+
}
268+
269+
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) {
270+
if (ev->Get()->Key != WatchKey) {
271+
// Skip old paths watch notifications
272+
return;
273+
}
274+
275+
LOG_D("Got delete notification");
276+
SendPoolInfoUpdate(std::nullopt, std::nullopt);
257277
}
258278

259279
public:
@@ -318,6 +338,10 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
318338
RemoveRequest(request);
319339
}
320340

341+
void SendPoolInfoUpdate(const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject) const {
342+
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), new TEvUpdatePoolInfo(Database, PoolId, config, securityObject));
343+
}
344+
321345
protected:
322346
virtual bool ShouldResign() const = 0;
323347
virtual void OnScheduleRequest(TRequest* request) = 0;

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,18 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
132132
return;
133133
}
134134

135-
LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
135+
const TString& database = ev->Get()->Database;
136+
const TString& poolId = ev->Get()->PoolId;
137+
138+
LOG_D("Recieved new request from " << workerActorId << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
139+
if (ev->Get()->FromCache) {
140+
if (auto poolState = GetPoolState(database, poolId)) {
141+
// Update LastUpdateTime due to pool idle checks
142+
poolState->LastUpdateTime = TInstant::Now();
143+
}
144+
return;
145+
}
146+
136147
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
137148
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
138149
}
@@ -392,7 +403,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
392403
}
393404
IdleChecksStarted = true;
394405

395-
Schedule(IDLE_DURATION / 2, new TEvents::TEvWakeup());
406+
Schedule(IDLE_DURATION / 2, new TEvents::TEvWakeup(static_cast<ui64>(EWakeUp::IdleCheck)));
396407
}
397408

398409
void RunIdleCheck() {
@@ -446,7 +457,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
446457
}
447458

448459
void ScheduleNodeInfoRequest() const {
449-
Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast<ui64>(EWakeUp::StartCpuLoadRequest)));
460+
Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast<ui64>(EWakeUp::StartNodeInfoRequest)));
450461
}
451462

452463
void RunNodeInfoRequest() const {

0 commit comments

Comments
 (0)