@@ -1293,52 +1293,57 @@ class TKqpGatewayProxy : public IKikimrGateway {
12931293 bool createDir, bool existingOk) override {
12941294 CHECK_PREPARED_DDL (CreateColumnTable);
12951295
1296- const auto & cluster = metadata->Cluster ;
1296+ try {
1297+ const auto & cluster = metadata->Cluster ;
12971298
1298- if (cluster != SessionCtx->GetCluster ()) {
1299- return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1300- }
1299+ if (cluster != SessionCtx->GetCluster ()) {
1300+ return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1301+ }
13011302
1302- std::pair<TString, TString> pathPair;
1303- {
1304- TString error;
1305- if (!NSchemeHelpers::SplitTablePath (metadata->Name , GetDatabase (), pathPair, error, createDir)) {
1306- return MakeFuture (ResultFromError<TGenericResult>(error));
1303+ std::pair<TString, TString> pathPair;
1304+ {
1305+ TString error;
1306+ if (!NSchemeHelpers::SplitTablePath (metadata->Name , GetDatabase (), pathPair, error, createDir)) {
1307+ return MakeFuture (ResultFromError<TGenericResult>(error));
1308+ }
13071309 }
1308- }
13091310
1310- TRemoveLastPhyTxHelper phyTxRemover;
1311- auto & phyTx = phyTxRemover.Capture (SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ());
1312- phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1311+ NKikimrSchemeOp::TModifyScheme schemeTx;
1312+ schemeTx.SetWorkingDir (pathPair.first );
13131313
1314- auto & schemeTx = *phyTx. MutableSchemeOperation ()-> MutableCreateColumnTable () ;
1315- schemeTx. SetWorkingDir (pathPair. first ) ;
1314+ Ydb::StatusIds::StatusCode code ;
1315+ TString error ;
13161316
1317- Ydb::StatusIds::StatusCode code ;
1318- TString error ;
1317+ schemeTx. SetOperationType (NKikimrSchemeOp::ESchemeOpCreateColumnTable) ;
1318+ schemeTx. SetFailedOnAlreadyExists (!existingOk) ;
13191319
1320- schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpCreateColumnTable);
1321- schemeTx.SetFailedOnAlreadyExists (!existingOk);
1320+ NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable ();
13221321
1323- NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable ();
1322+ tableDesc->SetName (pathPair.second );
1323+ FillColumnTableSchema (*tableDesc->MutableSchema (), *metadata);
13241324
1325- tableDesc->SetName (pathPair.second );
1326- FillColumnTableSchema (*tableDesc->MutableSchema (), *metadata);
1325+ if (!FillCreateColumnTableDesc (metadata, *tableDesc, code, error)) {
1326+ IKqpGateway::TGenericResult errResult;
1327+ errResult.AddIssue (NYql::TIssue (error));
1328+ errResult.SetStatus (NYql::YqlStatusFromYdbStatus (code));
1329+ return MakeFuture (std::move (errResult));
1330+ }
13271331
1328- if (!FillCreateColumnTableDesc (metadata, *tableDesc, code, error)) {
1329- IKqpGateway::TGenericResult errResult;
1330- errResult.AddIssue (NYql::TIssue (error));
1331- errResult.SetStatus (NYql::YqlStatusFromYdbStatus (code));
1332- return MakeFuture (std::move (errResult));
1333- }
1332+ if (IsPrepare ()) {
1333+ auto & phyQuery = *SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ();
1334+ auto & phyTx = *phyQuery.AddTransactions ();
1335+ phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1336+ phyTx.MutableSchemeOperation ()->MutableCreateTable ()->Swap (&schemeTx);
13341337
1335- if (IsPrepare ()) {
1336- TGenericResult result;
1337- result.SetSuccess ();
1338- phyTxRemover.Forget ();
1339- return MakeFuture (result);
1340- } else {
1341- return Gateway->ModifyScheme (std::move (schemeTx));
1338+ TGenericResult result;
1339+ result.SetSuccess ();
1340+ return MakeFuture (result);
1341+ } else {
1342+ return Gateway->ModifyScheme (std::move (schemeTx));
1343+ }
1344+ }
1345+ catch (yexception& e) {
1346+ return MakeFuture (ResultFromException<TGenericResult>(e));
13421347 }
13431348 }
13441349
@@ -1347,36 +1352,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
13471352 {
13481353 CHECK_PREPARED_DDL (AlterColumnTable);
13491354
1350- if (cluster != SessionCtx->GetCluster ()) {
1351- return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1352- }
1355+ try {
1356+ if (cluster != SessionCtx->GetCluster ()) {
1357+ return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1358+ }
13531359
1354- std::pair<TString, TString> pathPair;
1355- {
1356- TString error;
1357- if (!NSchemeHelpers::SplitTablePath (settings.Table , GetDatabase (), pathPair, error, false )) {
1358- return MakeFuture (ResultFromError<TGenericResult>(error));
1360+ std::pair<TString, TString> pathPair;
1361+ {
1362+ TString error;
1363+ if (!NSchemeHelpers::SplitTablePath (settings.Table , GetDatabase (), pathPair, error, false )) {
1364+ return MakeFuture (ResultFromError<TGenericResult>(error));
1365+ }
13591366 }
1360- }
13611367
1362- TRemoveLastPhyTxHelper phyTxRemover;
1363- auto & phyTx = phyTxRemover.Capture (SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ());
1364- phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1368+ NKikimrSchemeOp::TModifyScheme schemeTx;
1369+ schemeTx.SetWorkingDir (pathPair.first );
13651370
1366- auto & schemeTx = *phyTx.MutableSchemeOperation ()->MutableCreateColumnTable ();
1367- schemeTx.SetWorkingDir (pathPair.first );
1371+ schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1372+ NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable ();
1373+ alter->SetName (settings.Table );
13681374
1369- schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1370- NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable ();
1371- alter->SetName (settings.Table );
1375+ if (IsPrepare ()) {
1376+ auto & phyQuery = *SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ();
1377+ auto & phyTx = *phyQuery.AddTransactions ();
1378+ phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1379+ phyTx.MutableSchemeOperation ()->MutableAlterColumnTable ()->Swap (&schemeTx);
13721380
1373- if (IsPrepare ()) {
1374- TGenericResult result;
1375- result.SetSuccess ();
1376- phyTxRemover.Forget ();
1377- return MakeFuture (result);
1378- } else {
1379- return Gateway->ModifyScheme (std::move (schemeTx));
1381+ TGenericResult result;
1382+ result.SetSuccess ();
1383+ return MakeFuture (result);
1384+ } else {
1385+ return Gateway->ModifyScheme (std::move (schemeTx));
1386+ }
1387+ }
1388+ catch (yexception& e) {
1389+ return MakeFuture (ResultFromException<TGenericResult>(e));
13801390 }
13811391 }
13821392
@@ -1385,43 +1395,48 @@ class TKqpGatewayProxy : public IKikimrGateway {
13851395 {
13861396 CHECK_PREPARED_DDL (CreateTableStore);
13871397
1388- if (cluster != SessionCtx->GetCluster ()) {
1389- return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1390- }
1398+ try {
1399+ if (cluster != SessionCtx->GetCluster ()) {
1400+ return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1401+ }
13911402
1392- std::pair<TString, TString> pathPair;
1393- {
1394- TString error;
1395- if (!NSchemeHelpers::SplitTablePath (settings.TableStore , GetDatabase (), pathPair, error, false )) {
1396- return MakeFuture (ResultFromError<TGenericResult>(error));
1403+ std::pair<TString, TString> pathPair;
1404+ {
1405+ TString error;
1406+ if (!NSchemeHelpers::SplitTablePath (settings.TableStore , GetDatabase (), pathPair, error, false )) {
1407+ return MakeFuture (ResultFromError<TGenericResult>(error));
1408+ }
13971409 }
1398- }
13991410
1400- TRemoveLastPhyTxHelper phyTxRemover;
1401- auto & phyTx = phyTxRemover.Capture (SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ());
1402- phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1411+ NKikimrSchemeOp::TModifyScheme schemeTx;
1412+ schemeTx.SetWorkingDir (pathPair.first );
14031413
1404- auto & schemeTx = *phyTx. MutableSchemeOperation ()-> MutableCreateTableStore ( );
1405- schemeTx.SetWorkingDir (pathPair. first );
1414+ schemeTx. SetOperationType (NKikimrSchemeOp::ESchemeOpCreateColumnStore );
1415+ schemeTx.SetFailedOnAlreadyExists (!existingOk );
14061416
1407- schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1408- schemeTx.SetFailedOnAlreadyExists (!existingOk);
1417+ NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore ();
1418+ storeDesc->SetName (pathPair.second );
1419+ storeDesc->SetColumnShardCount (settings.ShardsCount );
14091420
1410- NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx. MutableCreateColumnStore ();
1411- storeDesc ->SetName (pathPair. second );
1412- storeDesc-> SetColumnShardCount ( settings. ShardsCount );
1421+ NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc-> AddSchemaPresets ();
1422+ schemaPreset ->SetName (" default " );
1423+ FillColumnTableSchema (*schemaPreset-> MutableSchema (), settings);
14131424
1414- NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets ();
1415- schemaPreset->SetName (" default" );
1416- FillColumnTableSchema (*schemaPreset->MutableSchema (), settings);
1425+ if (IsPrepare ()) {
1426+ auto & phyQuery = *SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ();
1427+ auto & phyTx = *phyQuery.AddTransactions ();
1428+ phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1429+ phyTx.MutableSchemeOperation ()->MutableCreateTableStore ()->Swap (&schemeTx);
14171430
1418- if (IsPrepare ()) {
1419- TGenericResult result;
1420- result.SetSuccess ();
1421- phyTxRemover.Forget ();
1422- return MakeFuture (result);
1423- } else {
1424- return Gateway->ModifyScheme (std::move (schemeTx));
1431+ TGenericResult result;
1432+ result.SetSuccess ();
1433+ return MakeFuture (result);
1434+ } else {
1435+ return Gateway->ModifyScheme (std::move (schemeTx));
1436+ }
1437+ }
1438+ catch (yexception& e) {
1439+ return MakeFuture (ResultFromException<TGenericResult>(e));
14251440 }
14261441 }
14271442
@@ -1430,36 +1445,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
14301445 {
14311446 CHECK_PREPARED_DDL (AlterTableStore);
14321447
1433- if (cluster != SessionCtx->GetCluster ()) {
1434- return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1435- }
1448+ try {
1449+ if (cluster != SessionCtx->GetCluster ()) {
1450+ return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1451+ }
14361452
1437- std::pair<TString, TString> pathPair;
1438- {
1439- TString error;
1440- if (!NSchemeHelpers::SplitTablePath (settings.TableStore , GetDatabase (), pathPair, error, false )) {
1441- return MakeFuture (ResultFromError<TGenericResult>(error));
1453+ std::pair<TString, TString> pathPair;
1454+ {
1455+ TString error;
1456+ if (!NSchemeHelpers::SplitTablePath (settings.TableStore , GetDatabase (), pathPair, error, false )) {
1457+ return MakeFuture (ResultFromError<TGenericResult>(error));
1458+ }
14421459 }
1443- }
14441460
1445- TRemoveLastPhyTxHelper phyTxRemover;
1446- auto & phyTx = phyTxRemover.Capture (SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ());
1447- phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1461+ NKikimrSchemeOp::TModifyScheme schemeTx;
1462+ schemeTx.SetWorkingDir (pathPair.first );
14481463
1449- auto & schemeTx = *phyTx.MutableSchemeOperation ()->MutableAlterTableStore ();
1450- schemeTx.SetWorkingDir (pathPair.first );
1464+ schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1465+ NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore ();
1466+ alter->SetName (pathPair.second );
14511467
1452- schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1453- NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore ();
1454- alter->SetName (pathPair.second );
1468+ if (IsPrepare ()) {
1469+ auto & phyQuery = *SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ();
1470+ auto & phyTx = *phyQuery.AddTransactions ();
1471+ phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1472+ phyTx.MutableSchemeOperation ()->MutableAlterTableStore ()->Swap (&schemeTx);
14551473
1456- if (IsPrepare ()) {
1457- TGenericResult result;
1458- result.SetSuccess ();
1459- phyTxRemover.Forget ();
1460- return MakeFuture (result);
1461- } else {
1462- return Gateway->ModifyScheme (std::move (schemeTx));
1474+ TGenericResult result;
1475+ result.SetSuccess ();
1476+ return MakeFuture (result);
1477+ } else {
1478+ return Gateway->ModifyScheme (std::move (schemeTx));
1479+ }
1480+ }
1481+ catch (yexception& e) {
1482+ return MakeFuture (ResultFromException<TGenericResult>(e));
14631483 }
14641484 }
14651485
@@ -1468,36 +1488,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
14681488 {
14691489 CHECK_PREPARED_DDL (DropTableStore);
14701490
1471- if (cluster != SessionCtx->GetCluster ()) {
1472- return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1473- }
1491+ try {
1492+ if (cluster != SessionCtx->GetCluster ()) {
1493+ return MakeFuture (ResultFromError<TGenericResult>(" Invalid cluster: " + cluster));
1494+ }
14741495
1475- std::pair<TString, TString> pathPair;
1476- {
1477- TString error;
1478- if (!NSchemeHelpers::SplitTablePath (settings.TableStore , GetDatabase (), pathPair, error, false )) {
1479- return MakeFuture (ResultFromError<TGenericResult>(error));
1496+ std::pair<TString, TString> pathPair;
1497+ {
1498+ TString error;
1499+ if (!NSchemeHelpers::SplitTablePath (settings.TableStore , GetDatabase (), pathPair, error, false )) {
1500+ return MakeFuture (ResultFromError<TGenericResult>(error));
1501+ }
14801502 }
1481- }
14821503
1483- TRemoveLastPhyTxHelper phyTxRemover;
1484- auto & phyTx = phyTxRemover.Capture (SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ());
1485- phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1504+ NKikimrSchemeOp::TModifyScheme schemeTx;
1505+ schemeTx.SetWorkingDir (pathPair.first );
1506+ schemeTx.SetSuccessOnNotExist (missingOk);
1507+ schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpDropColumnStore);
1508+ NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop ();
1509+ drop->SetName (pathPair.second );
14861510
1487- auto & schemeTx = *phyTx.MutableSchemeOperation ()->MutableDropTableStore ();
1488- schemeTx.SetWorkingDir (pathPair.first );
1489- schemeTx.SetSuccessOnNotExist (missingOk);
1490- schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpDropColumnStore);
1491- NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop ();
1492- drop->SetName (pathPair.second );
1511+ if (IsPrepare ()) {
1512+ auto & phyQuery = *SessionCtx->Query ().PreparingQuery ->MutablePhysicalQuery ();
1513+ auto & phyTx = *phyQuery.AddTransactions ();
1514+ phyTx.SetType (NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1515+ phyTx.MutableSchemeOperation ()->MutableDropTableStore ()->Swap (&schemeTx);
14931516
1494- if (IsPrepare ()) {
1495- TGenericResult result;
1496- result.SetSuccess ();
1497- phyTxRemover.Forget ();
1498- return MakeFuture (result);
1499- } else {
1500- return Gateway->ModifyScheme (std::move (schemeTx));
1517+ TGenericResult result;
1518+ result.SetSuccess ();
1519+ return MakeFuture (result);
1520+ } else {
1521+ return Gateway->ModifyScheme (std::move (schemeTx));
1522+ }
1523+ }
1524+ catch (yexception& e) {
1525+ return MakeFuture (ResultFromException<TGenericResult>(e));
15011526 }
15021527 }
15031528
0 commit comments