Skip to content

Commit 6569b37

Browse files
authored
Merge 2a4923c into bcf8689
2 parents bcf8689 + 2a4923c commit 6569b37

File tree

17 files changed

+363
-50
lines changed

17 files changed

+363
-50
lines changed

ydb/core/kqp/common/events/events.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ struct TEvKqp {
108108
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
109109
TEvScriptRequest() = default;
110110

111+
const TString& GetDatabase() const {
112+
return Record.GetRequest().GetDatabase();
113+
}
114+
115+
const TString& GetDatabaseId() const {
116+
return Record.GetRequest().GetDatabaseId();
117+
}
118+
119+
void SetDatabaseId(const TString& databaseId) {
120+
Record.MutableRequest()->SetDatabaseId(databaseId);
121+
}
122+
111123
mutable NKikimrKqp::TEvQueryRequest Record;
112124
TDuration ForgetAfter;
113125
TDuration ResultsTtl;
@@ -161,6 +173,36 @@ struct TEvKqp {
161173
return issues;
162174
}
163175
};
176+
177+
struct TEvSubscribeOnDatabase : public TEventLocal<TEvSubscribeOnDatabase, TKqpEvents::EvSubscribeOnDatabase> {
178+
explicit TEvSubscribeOnDatabase(const TString& database)
179+
: Database(database)
180+
{}
181+
182+
TString Database;
183+
};
184+
185+
struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
186+
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
187+
: Status(status)
188+
, Database(database)
189+
, Issues(std::move(issues))
190+
{}
191+
192+
TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
193+
: Status(Ydb::StatusIds::SUCCESS)
194+
, Database(database)
195+
, DatabaseId(databaseId)
196+
, Serverless(serverless)
197+
, Issues({})
198+
{}
199+
200+
Ydb::StatusIds::StatusCode Status;
201+
TString Database;
202+
TString DatabaseId;
203+
bool Serverless = false;
204+
NYql::TIssues Issues;
205+
};
164206
};
165207

166208
} // namespace NKikimr::NKqp

ydb/core/kqp/common/events/query.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,17 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
351351
return PoolConfig;
352352
}
353353

354+
const TString& GetDatabaseId() const {
355+
if (DatabaseId) {
356+
return DatabaseId;
357+
}
358+
return Record.GetRequest().GetDatabaseId();
359+
}
360+
361+
void SetDatabaseId(const TString& databaseId) {
362+
DatabaseId = databaseId;
363+
}
364+
354365
mutable NKikimrKqp::TEvQueryRequest Record;
355366

356367
private:
@@ -363,6 +374,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
363374
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
364375
TActorId RequestActorId;
365376
TString Database;
377+
TString DatabaseId;
366378
TString SessionId;
367379
TString YqlText;
368380
TString QueryId;

ydb/core/kqp/common/events/script_executions.h

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
2222
FS_ROLLBACK,
2323
};
2424

25-
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
26-
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
25+
template <typename TEv, ui32 TEventType>
26+
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
27+
TEventWithDatabaseId(const TString& database)
2728
: Database(database)
28-
, OperationId(id)
2929
{}
3030

31+
const TString& GetDatabase() const {
32+
return Database;
33+
}
34+
35+
const TString& GetDatabaseId() const {
36+
return DatabaseId;
37+
}
38+
39+
void SetDatabaseId(const TString& databaseId) {
40+
DatabaseId = databaseId;
41+
}
42+
3143
const TString Database;
44+
TString DatabaseId;
45+
};
46+
47+
struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
48+
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
49+
: TEventWithDatabaseId(database)
50+
, OperationId(id)
51+
{}
52+
3253
const NOperationId::TOperationId OperationId;
3354
};
3455

@@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
4364
NYql::TIssues Issues;
4465
};
4566

46-
struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
47-
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
48-
: Database(database)
67+
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
68+
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
69+
: TEventWithDatabaseId(database)
4970
, OperationId(id)
50-
{
51-
}
71+
{}
5272

53-
TString Database;
5473
NOperationId::TOperationId OperationId;
5574
};
5675

@@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
97116
TMaybe<google::protobuf::Any> Metadata;
98117
};
99118

100-
struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
119+
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
101120
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
102-
: Database(database)
121+
: TEventWithDatabaseId(database)
103122
, PageSize(pageSize)
104123
, PageToken(pageToken)
105124
{}
106125

107-
TString Database;
108126
ui64 PageSize;
109127
TString PageToken;
110128
};
@@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
151169
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
152170
};
153171

154-
struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
155-
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
156-
: Database(database)
172+
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
173+
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
174+
: TEventWithDatabaseId(database)
157175
, OperationId(id)
158-
{
159-
}
176+
{}
160177

161-
TString Database;
162178
NOperationId::TOperationId OperationId;
163179
};
164180

ydb/core/kqp/common/events/workload_service.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,4 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
9090
const std::optional<NACLib::TSecurityObject> SecurityObject;
9191
};
9292

93-
struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
94-
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
95-
: Database(database)
96-
, Serverless(serverless)
97-
{}
98-
99-
const TString Database;
100-
const bool Serverless;
101-
};
102-
10393
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/common/kqp_event_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
9090
Record.MutableRequest()->SetPoolId(PoolId);
9191
}
9292

93+
if (!DatabaseId.empty()) {
94+
Record.MutableRequest()->SetDatabaseId(DatabaseId);
95+
}
96+
9397
Record.MutableRequest()->SetSessionId(SessionId);
9498
Record.MutableRequest()->SetAction(QueryAction);
9599
Record.MutableRequest()->SetType(QueryType);

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ struct TKqpEvents {
4343
EvListSessionsRequest,
4444
EvListSessionsResponse,
4545
EvListProxyNodesRequest,
46-
EvListProxyNodesResponse
46+
EvListProxyNodesResponse,
47+
EvSubscribeOnDatabase,
48+
EvUpdateDatabaseInfo
4749
};
4850

4951
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
@@ -174,7 +176,6 @@ struct TKqpWorkloadServiceEvents {
174176
EvCleanupRequest,
175177
EvCleanupResponse,
176178
EvUpdatePoolInfo,
177-
EvUpdateDatabaseInfo,
178179
EvSubscribeOnPoolChanges,
179180
};
180181
};
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#include "kqp_proxy_service_impl.h"
2+
3+
#include <ydb/core/kqp/workload_service/actors/actors.h>
4+
#include <ydb/core/kqp/workload_service/common/events.h>
5+
6+
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
7+
8+
9+
namespace NKikimr::NKqp {
10+
11+
namespace {
12+
13+
class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
14+
using TBase = TActor<TDatabaseSubscriberActor>;
15+
16+
struct TDatabaseState {
17+
bool FetchRequestIsRunning = false;
18+
TPathId WatchPathId;
19+
20+
TString DatabaseId;
21+
bool Serverless = false;
22+
std::unordered_set<TActorId> Subscribers;
23+
};
24+
25+
public:
26+
TDatabaseSubscriberActor()
27+
: TBase(&TDatabaseSubscriberActor::StateFunc)
28+
{}
29+
30+
void Handle(TEvKqp::TEvSubscribeOnDatabase::TPtr& ev) {
31+
const TString& database = CanonizePath(ev->Get()->Database);
32+
auto& databaseState = Subscriptions[database];
33+
34+
if (databaseState.DatabaseId) {
35+
SendSubscriberInfo(database, ev->Sender, databaseState, Ydb::StatusIds::SUCCESS);
36+
} else if (!databaseState.FetchRequestIsRunning) {
37+
Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database));
38+
databaseState.FetchRequestIsRunning = true;
39+
}
40+
41+
databaseState.Subscribers.insert(ev->Sender);
42+
}
43+
44+
void Handle(NWorkload::TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
45+
const TString& database = CanonizePath(ev->Get()->Database);
46+
auto& databaseState = Subscriptions[database];
47+
48+
UpdateDatabaseState(databaseState, database, ev->Get()->PathId, ev->Get()->Serverless);
49+
UpdateSubscribersInfo(database, databaseState, ev->Get()->Status, ev->Get()->Issues);
50+
51+
databaseState.FetchRequestIsRunning = false;
52+
databaseState.WatchPathId = ev->Get()->PathId;
53+
54+
if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
55+
WatchKey++;
56+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(databaseState.WatchPathId, WatchKey));
57+
WatchDatabases.insert({WatchKey, database});
58+
}
59+
}
60+
61+
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) {
62+
auto it = WatchDatabases.find(ev->Get()->Key);
63+
if (it == WatchDatabases.end()) {
64+
return;
65+
}
66+
67+
const auto& result = ev->Get()->Result;
68+
if (!result || result->GetStatus() != NKikimrScheme::StatusSuccess) {
69+
return;
70+
}
71+
72+
if (result->GetPathDescription().HasDomainDescription()) {
73+
NSchemeCache::TDomainInfo description(result->GetPathDescription().GetDomainDescription());
74+
75+
auto& databaseState = Subscriptions[it->second];
76+
UpdateDatabaseState(databaseState, it->second, description.DomainKey, description.IsServerless());
77+
UpdateSubscribersInfo(it->second, databaseState, Ydb::StatusIds::SUCCESS);
78+
}
79+
}
80+
81+
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) {
82+
auto it = WatchDatabases.find(ev->Get()->Key);
83+
if (it == WatchDatabases.end()) {
84+
return;
85+
}
86+
87+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(ev->Get()->Key));
88+
89+
auto databaseStateIt = Subscriptions.find(it->second);
90+
if (databaseStateIt != Subscriptions.end()) {
91+
UpdateSubscribersInfo(it->second, databaseStateIt->second, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}});
92+
Subscriptions.erase(databaseStateIt);
93+
}
94+
95+
WatchDatabases.erase(it);
96+
}
97+
98+
void HandlePoison() {
99+
if (!WatchDatabases.empty()) {
100+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0));
101+
}
102+
103+
TBase::PassAway();
104+
}
105+
106+
STRICT_STFUNC(StateFunc,
107+
hFunc(TEvKqp::TEvSubscribeOnDatabase, Handle);
108+
hFunc(NWorkload::TEvPrivate::TEvFetchDatabaseResponse, Handle);
109+
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
110+
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
111+
sFunc(TEvents::TEvPoison, HandlePoison);
112+
)
113+
114+
private:
115+
void UpdateDatabaseState(TDatabaseState& databaseState, const TString& database, TPathId pathId, bool serverless) {
116+
databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << database;
117+
databaseState.Serverless = serverless;
118+
}
119+
120+
void UpdateSubscribersInfo(const TString& database, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
121+
for (const auto& subscriber : databaseState.Subscribers) {
122+
SendSubscriberInfo(database, subscriber, databaseState, status, issues);
123+
}
124+
}
125+
126+
void SendSubscriberInfo(const TString& database, TActorId subscriber, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
127+
if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) {
128+
Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless));
129+
} else {
130+
NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database" << database);
131+
for (const auto& issue : issues) {
132+
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
133+
}
134+
Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, {rootIssue}));
135+
}
136+
}
137+
138+
private:
139+
std::unordered_map<TString, TDatabaseState> Subscriptions;
140+
std::unordered_map<ui32, TString> WatchDatabases;
141+
ui32 WatchKey = 0;
142+
};
143+
144+
} // anonymous namespace
145+
146+
void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) {
147+
SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor());
148+
}
149+
150+
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)