Skip to content

Commit 0459184

Browse files
authored
Merge 84f575d into 9de503c
2 parents 9de503c + 84f575d commit 0459184

15 files changed

+866
-17
lines changed

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,5 @@ message TFeatureFlags {
128128
optional bool EnableServerlessExclusiveDynamicNodes = 113 [default = false];
129129
optional bool EnableAccessServiceBulkAuthorization = 114 [default = false];
130130
optional bool EnableAddColumsWithDefaults = 115 [ default = false];
131+
optional bool EnableReplaceIfExists = 116 [ default = false];
131132
}

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class TTestFeatureFlagsHolder {
5757
FEATURE_FLAG_SETTER(EnableServerlessExclusiveDynamicNodes)
5858
FEATURE_FLAG_SETTER(EnableAccessServiceBulkAuthorization)
5959
FEATURE_FLAG_SETTER(EnableAddColumsWithDefaults)
60+
FEATURE_FLAG_SETTER(EnableReplaceIfExists)
6061

6162
#undef FEATURE_FLAG_SETTER
6263
};

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,14 +1062,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
10621062
case TTxState::ETxType::TxDropExternalTable:
10631063
return CreateDropExternalTable(NextPartId(), txState);
10641064
case TTxState::ETxType::TxAlterExternalTable:
1065-
Y_ABORT("TODO: implement");
1065+
return CreateAlterExternalTable(NextPartId(), txState);
10661066
case TTxState::ETxType::TxCreateExternalDataSource:
10671067
return CreateNewExternalDataSource(NextPartId(), txState);
10681068
case TTxState::ETxType::TxDropExternalDataSource:
10691069
return CreateDropExternalDataSource(NextPartId(), txState);
10701070
case TTxState::ETxType::TxAlterExternalDataSource:
1071-
Y_ABORT("TODO: implement");
1072-
1071+
return CreateAlterExternalDataSource(NextPartId(), txState);
1072+
10731073
// View
10741074
case TTxState::ETxType::TxCreateView:
10751075
return CreateNewView(NextPartId(), txState);
@@ -1282,15 +1282,15 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op
12821282

12831283
// ExternalTable
12841284
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
1285-
return CreateNewExternalTable(NextPartId(), tx);
1285+
Y_ABORT("operation is handled before");
12861286
case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable:
12871287
return CreateDropExternalTable(NextPartId(), tx);
12881288
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable:
12891289
Y_ABORT("TODO: implement");
12901290

12911291
// ExternalDataSource
12921292
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource:
1293-
return CreateNewExternalDataSource(NextPartId(), tx);
1293+
Y_ABORT("operation is handled before");
12941294
case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource:
12951295
return CreateDropExternalDataSource(NextPartId(), tx);
12961296
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource:
@@ -1351,6 +1351,10 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
13511351
return CreateConsistentMoveIndex(NextPartId(), tx, context);
13521352
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
13531353
return CreateCompatibleAlterExtSubDomain(NextPartId(), tx, context);
1354+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource:
1355+
return CreateNewExternalDataSource(NextPartId(), tx, context);
1356+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
1357+
return CreateNewExternalTable(NextPartId(), tx, context);
13541358
default:
13551359
return {ConstructPart(opType, tx)};
13561360
}
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
#include "schemeshard__operation_common_external_data_source.h"
2+
#include "schemeshard__operation_part.h"
3+
#include "schemeshard__operation_common.h"
4+
#include "schemeshard_impl.h"
5+
6+
#include <utility>
7+
8+
namespace {
9+
10+
using namespace NKikimr;
11+
using namespace NSchemeShard;
12+
13+
class TPropose: public TSubOperationState {
14+
private:
15+
const TOperationId OperationId;
16+
17+
TString DebugHint() const override {
18+
return TStringBuilder()
19+
<< "TAlterExternalDataSource TPropose"
20+
<< ", operationId: " << OperationId;
21+
}
22+
23+
public:
24+
explicit TPropose(TOperationId id)
25+
: OperationId(std::move(id))
26+
{
27+
}
28+
29+
bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
30+
const TStepId step = TStepId(ev->Get()->StepId);
31+
32+
LOG_I(DebugHint() << "HandleReply TEvOperationPlan"
33+
<< ": step# " << step);
34+
35+
const TTxState* txState = context.SS->FindTx(OperationId);
36+
Y_ABORT_UNLESS(txState);
37+
Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalDataSource);
38+
39+
const auto pathId = txState->TargetPathId;
40+
const auto path = TPath::Init(pathId, context.SS);
41+
const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId);
42+
43+
NIceDb::TNiceDb db(context.GetDB());
44+
45+
IncParentDirAlterVersionWithRepublish(OperationId, path, context);
46+
47+
context.SS->ClearDescribePathCaches(pathPtr);
48+
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
49+
50+
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
51+
return true;
52+
}
53+
54+
bool ProgressState(TOperationContext& context) override {
55+
LOG_I(DebugHint() << "ProgressState");
56+
57+
const TTxState* txState = context.SS->FindTx(OperationId);
58+
Y_ABORT_UNLESS(txState);
59+
Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalDataSource);
60+
61+
context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0));
62+
return false;
63+
}
64+
};
65+
66+
class TAlterExternalDataSource : public TSubOperation {
67+
static TTxState::ETxState NextState() { return TTxState::Propose; }
68+
69+
TTxState::ETxState NextState(TTxState::ETxState state) const override {
70+
switch (state) {
71+
case TTxState::Waiting:
72+
case TTxState::Propose:
73+
return TTxState::Done;
74+
default:
75+
return TTxState::Invalid;
76+
}
77+
}
78+
79+
TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override {
80+
switch (state) {
81+
case TTxState::Waiting:
82+
case TTxState::Propose:
83+
return MakeHolder<TPropose>(OperationId);
84+
case TTxState::Done:
85+
return MakeHolder<TDone>(OperationId);
86+
default:
87+
return nullptr;
88+
}
89+
}
90+
91+
static bool IsDestinationPathValid(const THolder<TProposeResponse>& result,
92+
const TPath& dstPath,
93+
const TString& acl) {
94+
const auto checks = dstPath.Check();
95+
checks.IsAtLocalSchemeShard()
96+
.IsResolved()
97+
.NotUnderDeleting()
98+
.FailOnWrongType(TPathElement::EPathType::EPathTypeExternalDataSource)
99+
.IsValidLeafName()
100+
.DepthLimit()
101+
.PathsLimit()
102+
.DirChildrenLimit()
103+
.IsValidACL(acl);
104+
105+
if (!checks) {
106+
result->SetError(checks.GetStatus(), checks.GetError());
107+
if (dstPath.IsResolved()) {
108+
result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId));
109+
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
110+
}
111+
}
112+
113+
return static_cast<bool>(checks);
114+
}
115+
116+
bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result,
117+
const TOperationContext& context) const {
118+
TString errorMessage;
119+
if (!context.SS->CheckApplyIf(Transaction, errorMessage)) {
120+
result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage);
121+
return false;
122+
}
123+
return true;
124+
}
125+
126+
static bool IsDescriptionValid(
127+
const THolder<TProposeResponse>& result,
128+
const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
129+
const NExternalSource::IExternalSourceFactory::TPtr& factory) {
130+
TString errorMessage;
131+
if (!NExternalDataSource::Validate(desc, factory, errorMessage)) {
132+
result->SetError(NKikimrScheme::StatusSchemeError, errorMessage);
133+
return false;
134+
}
135+
return true;
136+
}
137+
138+
static void AddPathInSchemeShard(
139+
const THolder<TProposeResponse>& result, const TPath& dstPath) {
140+
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
141+
}
142+
143+
TPathElement::TPtr CreateExternalDataSourcePathElement(const TPath& dstPath) const {
144+
TPathElement::TPtr externalDataSource = dstPath.Base();
145+
146+
externalDataSource->PathState = TPathElement::EPathState::EPathStateAlter;
147+
externalDataSource->LastTxId = OperationId.GetTxId();
148+
149+
return externalDataSource;
150+
}
151+
152+
void CreateTransaction(const TOperationContext& context,
153+
const TPathId& externalDataSourcePathId) const {
154+
TTxState& txState = context.SS->CreateTx(OperationId,
155+
TTxState::TxAlterExternalDataSource,
156+
externalDataSourcePathId);
157+
txState.Shards.clear();
158+
}
159+
160+
void RegisterParentPathDependencies(const TOperationContext& context,
161+
const TPath& parentPath) const {
162+
if (parentPath.Base()->HasActiveChanges()) {
163+
const TTxId parentTxId = parentPath.Base()->PlannedToCreate()
164+
? parentPath.Base()->CreateTxId
165+
: parentPath.Base()->LastTxId;
166+
context.OnComplete.Dependence(parentTxId, OperationId.GetTxId());
167+
}
168+
}
169+
170+
void AdvanceTransactionStateToPropose(const TOperationContext& context,
171+
NIceDb::TNiceDb& db) const {
172+
context.SS->ChangeTxState(db, OperationId, TTxState::Propose);
173+
context.OnComplete.ActivateTx(OperationId);
174+
}
175+
176+
void PersistExternalDataSource(
177+
const TOperationContext& context,
178+
NIceDb::TNiceDb& db,
179+
const TPathElement::TPtr& externalDataSourcePath,
180+
const TExternalDataSourceInfo::TPtr& externalDataSourceInfo,
181+
const TString& acl) const {
182+
const auto& externalDataSourcePathId = externalDataSourcePath->PathId;
183+
184+
context.SS->ExternalDataSources[externalDataSourcePathId] = externalDataSourceInfo;
185+
context.SS->PersistPath(db, externalDataSourcePathId);
186+
187+
if (!acl.empty()) {
188+
externalDataSourcePath->ApplyACL(acl);
189+
context.SS->PersistACL(db, externalDataSourcePath);
190+
}
191+
192+
context.SS->PersistExternalDataSource(db,
193+
externalDataSourcePathId,
194+
externalDataSourceInfo);
195+
context.SS->PersistTxState(db, OperationId);
196+
}
197+
198+
public:
199+
using TSubOperation::TSubOperation;
200+
201+
THolder<TProposeResponse> Propose(const TString& owner,
202+
TOperationContext& context) override {
203+
Y_UNUSED(owner);
204+
const auto ssId = context.SS->SelfTabletId();
205+
const TString& parentPathStr = Transaction.GetWorkingDir();
206+
const auto& externalDataSourceDescription =
207+
Transaction.GetCreateExternalDataSource();
208+
const TString& name = externalDataSourceDescription.GetName();
209+
210+
LOG_N("TAlterExternalDataSource Propose"
211+
<< ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name);
212+
213+
auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted,
214+
static_cast<ui64>(OperationId.GetTxId()),
215+
static_cast<ui64>(ssId));
216+
217+
const TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
218+
RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(result, parentPath));
219+
220+
const TString acl = Transaction.GetModifyACL().GetDiffACL();
221+
const TPath dstPath = parentPath.Child(name);
222+
223+
RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl));
224+
RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context));
225+
RETURN_RESULT_UNLESS(IsDescriptionValid(result,
226+
externalDataSourceDescription,
227+
context.SS->ExternalSourceFactory));
228+
229+
const auto oldExternalDataSourceInfo =
230+
context.SS->ExternalDataSources.Value(dstPath->PathId, nullptr);
231+
Y_ABORT_UNLESS(oldExternalDataSourceInfo);
232+
const TExternalDataSourceInfo::TPtr externalDataSourceInfo =
233+
NExternalDataSource::CreateExternalDataSource(externalDataSourceDescription,
234+
oldExternalDataSourceInfo->AlterVersion + 1);
235+
Y_ABORT_UNLESS(externalDataSourceInfo);
236+
237+
AddPathInSchemeShard(result, dstPath);
238+
const TPathElement::TPtr externalDataSource =
239+
CreateExternalDataSourcePathElement(dstPath);
240+
CreateTransaction(context, externalDataSource->PathId);
241+
242+
NIceDb::TNiceDb db(context.GetDB());
243+
244+
RegisterParentPathDependencies(context, parentPath);
245+
246+
AdvanceTransactionStateToPropose(context, db);
247+
248+
PersistExternalDataSource(
249+
context, db, externalDataSource, externalDataSourceInfo, acl);
250+
251+
IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId,
252+
dstPath,
253+
context.SS,
254+
context.OnComplete);
255+
256+
SetState(NextState());
257+
return result;
258+
}
259+
260+
void AbortPropose(TOperationContext& context) override {
261+
LOG_N("TAlterExternalDataSource AbortPropose"
262+
<< ": opId# " << OperationId);
263+
Y_ABORT("no AbortPropose for TAlterExternalDataSource");
264+
}
265+
266+
void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {
267+
LOG_N("TAlterExternalDataSource AbortUnsafe"
268+
<< ": opId# " << OperationId << ", txId# " << forceDropTxId);
269+
context.OnComplete.DoneOperation(OperationId);
270+
}
271+
};
272+
273+
} // namespace
274+
275+
namespace NKikimr::NSchemeShard {
276+
277+
ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, const TTxTransaction& tx) {
278+
return MakeSubOperation<TAlterExternalDataSource>(id, tx);
279+
}
280+
281+
ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, TTxState::ETxState state) {
282+
Y_ABORT_UNLESS(state != TTxState::Invalid);
283+
return MakeSubOperation<TAlterExternalDataSource>(id, state);
284+
}
285+
286+
}

0 commit comments

Comments
 (0)