66#include < ydb/core/tx/tx_proxy/proxy.h>
77#include < ydb/core/kqp/session_actor/kqp_worker_common.h>
88#include < ydb/core/tx/schemeshard/schemeshard_build_index.h>
9+ #include < ydb/services/metadata/abstract/kqp_common.h>
910
1011namespace NKikimr ::NKqp {
1112
@@ -49,7 +50,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
4950 return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
5051 }
5152
52- TKqpSchemeExecuter (TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
53+ TKqpSchemeExecuter (TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
5354 const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
5455 bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
5556 : PhyTx(phyTx)
@@ -69,13 +70,11 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
6970 }
7071
7172 void StartBuildOperation () {
72- const auto & schemeOp = PhyTx->GetSchemeOperation ();
73- auto buildOp = schemeOp.GetBuildOperation ();
7473 Send (MakeTxProxyID (), new TEvTxUserProxy::TEvAllocateTxId);
75- Become (&TKqpSchemeExecuter::ExecuteState);
74+ Become (&TKqpSchemeExecuter::ExecuteState);
7675 }
7776
78- void Bootstrap () {
77+ void MakeSchemeOperationRequest () {
7978 using TRequest = TEvTxUserProxy::TEvProposeTransaction;
8079
8180 auto ev = MakeHolder<TRequest>();
@@ -124,30 +123,44 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
124123 }
125124
126125 case NKqpProto::TKqpSchemeOperation::kAlterTable : {
127- auto modifyScheme = schemeOp.GetAlterTable ();
126+ const auto & modifyScheme = schemeOp.GetAlterTable ();
128127 ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
129128 break ;
130129 }
131130
132131 case NKqpProto::TKqpSchemeOperation::kBuildOperation : {
133- auto buildOp = schemeOp.GetBuildOperation ();
134132 return StartBuildOperation ();
135133 }
136134
137135 case NKqpProto::TKqpSchemeOperation::kCreateUser : {
138- auto modifyScheme = schemeOp.GetCreateUser ();
136+ const auto & modifyScheme = schemeOp.GetCreateUser ();
139137 ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
140138 break ;
141139 }
142140
143141 case NKqpProto::TKqpSchemeOperation::kAlterUser : {
144- auto modifyScheme = schemeOp.GetAlterUser ();
142+ const auto & modifyScheme = schemeOp.GetAlterUser ();
145143 ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
146144 break ;
147145 }
148146
149147 case NKqpProto::TKqpSchemeOperation::kDropUser : {
150- auto modifyScheme = schemeOp.GetDropUser ();
148+ const auto & modifyScheme = schemeOp.GetDropUser ();
149+ ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
150+ break ;
151+ }
152+ case NKqpProto::TKqpSchemeOperation::kCreateExternalTable : {
153+ const auto & modifyScheme = schemeOp.GetCreateExternalTable ();
154+ ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
155+ break ;
156+ }
157+ case NKqpProto::TKqpSchemeOperation::kAlterExternalTable : {
158+ const auto & modifyScheme = schemeOp.GetAlterExternalTable ();
159+ ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
160+ break ;
161+ }
162+ case NKqpProto::TKqpSchemeOperation::kDropExternalTable : {
163+ const auto & modifyScheme = schemeOp.GetDropExternalTable ();
151164 ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
152165 break ;
153166 }
@@ -191,7 +204,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
191204 auto promise = NewPromise<IKqpGateway::TGenericResult>();
192205
193206 bool successOnNotExist = false ;
194- bool failedOnAlreadyExists = false ;
207+ bool failedOnAlreadyExists = false ;
195208 // exists/not exists semantics supported only in the query service.
196209 if (IsQueryService ()) {
197210 successOnNotExist = ev->Record .GetTransaction ().GetModifyScheme ().GetSuccessOnNotExist ();
@@ -218,6 +231,57 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
218231 Become (&TKqpSchemeExecuter::ExecuteState);
219232 }
220233
234+ void MakeObjectRequest () {
235+ const auto & schemeOp = PhyTx->GetSchemeOperation ();
236+ NMetadata::IClassBehaviour::TPtr cBehaviour (NMetadata::IClassBehaviour::TFactory::Construct (schemeOp.GetObjectType ()));
237+ if (!cBehaviour) {
238+ InternalError (TStringBuilder () << " Unsupported object type: \" " << schemeOp.GetObjectType () << " \" " );
239+ return ;
240+ }
241+
242+ if (!cBehaviour->GetOperationsManager ()) {
243+ InternalError (TStringBuilder () << " Object type \" " << schemeOp.GetObjectType () << " \" does not have manager for operations" );
244+ }
245+
246+ auto * actorSystem = TActivationContext::ActorSystem ();
247+ auto selfId = SelfId ();
248+
249+ NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
250+ context.SetDatabase (Database);
251+ context.SetActorSystem (actorSystem);
252+ if (UserToken) {
253+ context.SetUserToken (*UserToken);
254+ }
255+
256+ auto resultFuture = cBehaviour->GetOperationsManager ()->ExecutePrepared (schemeOp, cBehaviour, context);
257+
258+ using TResultFuture = NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus>;
259+ resultFuture.Subscribe ([actorSystem, selfId](const TResultFuture& f) {
260+ const auto & status = f.GetValue ();
261+ auto ev = MakeHolder<TEvPrivate::TEvResult>();
262+ if (status.Ok ()) {
263+ ev->Result .SetSuccess ();
264+ } else {
265+ ev->Result .SetStatus (status.GetStatus ());
266+ if (TString message = status.GetErrorMessage ()) {
267+ ev->Result .AddIssue (NYql::TIssue{message});
268+ }
269+ }
270+ actorSystem->Send (selfId, ev.Release ());
271+ });
272+
273+ Become (&TKqpSchemeExecuter::ObjectExecuteState);
274+ }
275+
276+ void Bootstrap () {
277+ const auto & schemeOp = PhyTx->GetSchemeOperation ();
278+ if (schemeOp.GetObjectType ()) {
279+ MakeObjectRequest ();
280+ } else {
281+ MakeSchemeOperationRequest ();
282+ }
283+ }
284+
221285public:
222286 STATEFN (ExecuteState) {
223287 try {
@@ -240,6 +304,19 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
240304 }
241305 }
242306
307+ STATEFN (ObjectExecuteState) {
308+ try {
309+ switch (ev->GetTypeRewrite ()) {
310+ hFunc (TEvPrivate::TEvResult, HandleExecute);
311+ hFunc (TEvKqp::TEvAbortExecution, HandleAbortExecution);
312+ default :
313+ UnexpectedEvent (" ObjectExecuteState" , ev->GetTypeRewrite ());
314+ }
315+ } catch (const yexception& e) {
316+ InternalError (e.what ());
317+ }
318+ }
319+
243320
244321 void Handle (TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
245322 const auto * msg = ev->Get ();
@@ -250,18 +327,18 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
250327
251328 void Navigate (const TActorId& schemeCache) {
252329 const auto & schemeOp = PhyTx->GetSchemeOperation ();
253- auto buildOp = schemeOp.GetBuildOperation ();
330+ const auto & buildOp = schemeOp.GetBuildOperation ();
254331 const auto & path = buildOp.source_path ();
255332
256333 const auto paths = NKikimr::SplitPath (path);
257334 if (paths.empty ()) {
258335 TString error = TStringBuilder () << " Failed to split table path " << path;
259336 return ReplyErrorAndDie (Ydb::StatusIds::BAD_REQUEST, NYql::TIssue (error));
260337 }
261-
338+
262339 auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
263340
264- request->DatabaseName = Database;
341+ request->DatabaseName = Database;
265342 auto & entry = request->ResultSet .emplace_back ();
266343 entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
267344 entry.Path = ::NKikimr::SplitPath (path);
@@ -312,7 +389,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
312389 }
313390
314391 const auto & schemeOp = PhyTx->GetSchemeOperation ();
315- auto buildOp = schemeOp.GetBuildOperation ();
392+ const auto & buildOp = schemeOp.GetBuildOperation ();
316393 SetSchemeShardId (domainInfo->ExtractSchemeShard ());
317394 auto req = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, Database, buildOp);
318395 ForwardToSchemeShard (std::move (req));
0 commit comments