-
Notifications
You must be signed in to change notification settings - Fork 734
[YQ-1997] Support for ReplaceIfExists flag in SchemeShard for External Data Source and External Table #1431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
UgnineSirdis
merged 12 commits into
ydb-platform:main
from
Alnen:YQ-1997-ss-replace-support
Jan 31, 2024
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
3e756cc
Initial implementation
Alnen 84f575d
Cleaned code #1
Alnen 73b8d3c
Tests #1
Alnen b2e29bf
Fixed PR comments #1
Alnen eff009d
Fixed PR comments #2
Alnen 0d715a8
Update ydb/core/tx/schemeshard/ut_external_table/ut_external_table.cpp
Alnen 37e2203
Update ydb/core/tx/schemeshard/ut_external_table/ut_external_table.cpp
Alnen 249a4cc
Fixed PR comments #3
Alnen fc108f7
Fixed PR comments #4
Alnen 02ad968
Fixed PR comments #5
Alnen fb9d578
Merge branch 'main' into YQ-1997-ss-replace-support
Alnen b3251c7
Fixed PR comments #6
Alnen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
286 changes: 286 additions & 0 deletions
286
ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,286 @@ | ||
| #include "schemeshard__operation_common_external_data_source.h" | ||
| #include "schemeshard__operation_part.h" | ||
| #include "schemeshard__operation_common.h" | ||
| #include "schemeshard_impl.h" | ||
|
|
||
| #include <utility> | ||
|
|
||
| namespace { | ||
|
|
||
| using namespace NKikimr; | ||
| using namespace NSchemeShard; | ||
|
|
||
| class TPropose: public TSubOperationState { | ||
| private: | ||
| const TOperationId OperationId; | ||
|
|
||
| TString DebugHint() const override { | ||
| return TStringBuilder() | ||
| << "TAlterExternalDataSource TPropose" | ||
| << ", operationId: " << OperationId; | ||
| } | ||
|
|
||
| public: | ||
| explicit TPropose(TOperationId id) | ||
| : OperationId(std::move(id)) | ||
| { | ||
| } | ||
|
|
||
| bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { | ||
| const TStepId step = TStepId(ev->Get()->StepId); | ||
|
|
||
| LOG_I(DebugHint() << "HandleReply TEvOperationPlan" | ||
| << ": step# " << step); | ||
|
|
||
| const TTxState* txState = context.SS->FindTx(OperationId); | ||
| Y_ABORT_UNLESS(txState); | ||
| Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalDataSource); | ||
|
|
||
| const auto pathId = txState->TargetPathId; | ||
| const auto path = TPath::Init(pathId, context.SS); | ||
| const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); | ||
|
|
||
| NIceDb::TNiceDb db(context.GetDB()); | ||
|
|
||
| IncParentDirAlterVersionWithRepublish(OperationId, path, context); | ||
|
|
||
| context.SS->ClearDescribePathCaches(pathPtr); | ||
| context.OnComplete.PublishToSchemeBoard(OperationId, pathId); | ||
|
|
||
| context.SS->ChangeTxState(db, OperationId, TTxState::Done); | ||
| return true; | ||
| } | ||
|
|
||
| bool ProgressState(TOperationContext& context) override { | ||
| LOG_I(DebugHint() << "ProgressState"); | ||
|
|
||
| const TTxState* txState = context.SS->FindTx(OperationId); | ||
| Y_ABORT_UNLESS(txState); | ||
| Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalDataSource); | ||
|
|
||
| context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); | ||
| return false; | ||
| } | ||
| }; | ||
|
|
||
| class TAlterExternalDataSource : public TSubOperation { | ||
| static TTxState::ETxState NextState() { return TTxState::Propose; } | ||
|
|
||
| TTxState::ETxState NextState(TTxState::ETxState state) const override { | ||
| switch (state) { | ||
| case TTxState::Waiting: | ||
| case TTxState::Propose: | ||
| return TTxState::Done; | ||
| default: | ||
| return TTxState::Invalid; | ||
| } | ||
| } | ||
|
|
||
| TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { | ||
| switch (state) { | ||
| case TTxState::Waiting: | ||
| case TTxState::Propose: | ||
| return MakeHolder<TPropose>(OperationId); | ||
| case TTxState::Done: | ||
| return MakeHolder<TDone>(OperationId); | ||
| default: | ||
| return nullptr; | ||
| } | ||
| } | ||
|
|
||
| static bool IsDestinationPathValid(const THolder<TProposeResponse>& result, | ||
| const TPath& dstPath, | ||
| const TString& acl) { | ||
| const auto checks = dstPath.Check(); | ||
| checks.IsAtLocalSchemeShard() | ||
| .IsResolved() | ||
| .NotUnderDeleting() | ||
| .FailOnWrongType(TPathElement::EPathType::EPathTypeExternalDataSource) | ||
| .IsValidLeafName() | ||
| .DepthLimit() | ||
| .PathsLimit() | ||
| .DirChildrenLimit() | ||
| .IsValidACL(acl); | ||
|
|
||
| if (!checks) { | ||
| result->SetError(checks.GetStatus(), checks.GetError()); | ||
| if (dstPath.IsResolved()) { | ||
| result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId)); | ||
| result->SetPathId(dstPath.Base()->PathId.LocalPathId); | ||
| } | ||
| } | ||
|
|
||
| return static_cast<bool>(checks); | ||
| } | ||
|
|
||
| bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result, | ||
| const TOperationContext& context) const { | ||
| TString errorMessage; | ||
| if (!context.SS->CheckApplyIf(Transaction, errorMessage)) { | ||
| result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage); | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| static bool IsDescriptionValid( | ||
| const THolder<TProposeResponse>& result, | ||
| const NKikimrSchemeOp::TExternalDataSourceDescription& desc, | ||
| const NExternalSource::IExternalSourceFactory::TPtr& factory) { | ||
| TString errorMessage; | ||
| if (!NExternalDataSource::Validate(desc, factory, errorMessage)) { | ||
| result->SetError(NKikimrScheme::StatusSchemeError, errorMessage); | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| static void AddPathInSchemeShard( | ||
| const THolder<TProposeResponse>& result, const TPath& dstPath) { | ||
| result->SetPathId(dstPath.Base()->PathId.LocalPathId); | ||
| } | ||
|
|
||
| TPathElement::TPtr ReplaceExternalDataSourcePathElement(const TPath& dstPath) const { | ||
| TPathElement::TPtr externalDataSource = dstPath.Base(); | ||
|
|
||
| externalDataSource->PathState = TPathElement::EPathState::EPathStateAlter; | ||
| externalDataSource->LastTxId = OperationId.GetTxId(); | ||
|
|
||
| return externalDataSource; | ||
| } | ||
|
|
||
| void CreateTransaction(const TOperationContext& context, | ||
| const TPathId& externalDataSourcePathId) const { | ||
| TTxState& txState = context.SS->CreateTx(OperationId, | ||
| TTxState::TxAlterExternalDataSource, | ||
| externalDataSourcePathId); | ||
| txState.Shards.clear(); | ||
| } | ||
|
|
||
| void RegisterParentPathDependencies(const TOperationContext& context, | ||
| const TPath& parentPath) const { | ||
| if (parentPath.Base()->HasActiveChanges()) { | ||
| const TTxId parentTxId = parentPath.Base()->PlannedToCreate() | ||
| ? parentPath.Base()->CreateTxId | ||
| : parentPath.Base()->LastTxId; | ||
| context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); | ||
| } | ||
| } | ||
|
|
||
| void AdvanceTransactionStateToPropose(const TOperationContext& context, | ||
| NIceDb::TNiceDb& db) const { | ||
| context.SS->ChangeTxState(db, OperationId, TTxState::Propose); | ||
| context.OnComplete.ActivateTx(OperationId); | ||
| } | ||
|
|
||
| void PersistExternalDataSource( | ||
| const TOperationContext& context, | ||
| NIceDb::TNiceDb& db, | ||
| const TPathElement::TPtr& externalDataSourcePath, | ||
| const TExternalDataSourceInfo::TPtr& externalDataSourceInfo, | ||
| const TString& acl) const { | ||
| const auto& externalDataSourcePathId = externalDataSourcePath->PathId; | ||
|
|
||
| context.SS->ExternalDataSources[externalDataSourcePathId] = externalDataSourceInfo; | ||
| context.SS->PersistPath(db, externalDataSourcePathId); | ||
|
|
||
| if (!acl.empty()) { | ||
| externalDataSourcePath->ApplyACL(acl); | ||
| context.SS->PersistACL(db, externalDataSourcePath); | ||
| } | ||
|
|
||
| context.SS->PersistExternalDataSource(db, | ||
| externalDataSourcePathId, | ||
| externalDataSourceInfo); | ||
| context.SS->PersistTxState(db, OperationId); | ||
| } | ||
|
|
||
| public: | ||
| using TSubOperation::TSubOperation; | ||
|
|
||
| THolder<TProposeResponse> Propose(const TString& owner, | ||
| TOperationContext& context) override { | ||
| Y_UNUSED(owner); | ||
| const auto ssId = context.SS->SelfTabletId(); | ||
| const TString& parentPathStr = Transaction.GetWorkingDir(); | ||
| const auto& externalDataSourceDescription = | ||
| Transaction.GetCreateExternalDataSource(); | ||
| const TString& name = externalDataSourceDescription.GetName(); | ||
|
|
||
| LOG_N("TAlterExternalDataSource Propose" | ||
| << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name); | ||
|
|
||
| auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, | ||
| static_cast<ui64>(OperationId.GetTxId()), | ||
| static_cast<ui64>(ssId)); | ||
|
|
||
| const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); | ||
| RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(result, parentPath)); | ||
|
|
||
| const TString acl = Transaction.GetModifyACL().GetDiffACL(); | ||
| const TPath dstPath = parentPath.Child(name); | ||
|
|
||
| RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl)); | ||
| RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context)); | ||
| RETURN_RESULT_UNLESS(IsDescriptionValid(result, | ||
| externalDataSourceDescription, | ||
| context.SS->ExternalSourceFactory)); | ||
|
|
||
| const auto oldExternalDataSourceInfo = | ||
| context.SS->ExternalDataSources.Value(dstPath->PathId, nullptr); | ||
| Y_ABORT_UNLESS(oldExternalDataSourceInfo); | ||
| const TExternalDataSourceInfo::TPtr externalDataSourceInfo = | ||
| NExternalDataSource::CreateExternalDataSource(externalDataSourceDescription, | ||
| oldExternalDataSourceInfo->AlterVersion + 1); | ||
| Y_ABORT_UNLESS(externalDataSourceInfo); | ||
|
|
||
| AddPathInSchemeShard(result, dstPath); | ||
| const TPathElement::TPtr externalDataSource = | ||
| ReplaceExternalDataSourcePathElement(dstPath); | ||
| CreateTransaction(context, externalDataSource->PathId); | ||
|
|
||
| NIceDb::TNiceDb db(context.GetDB()); | ||
|
|
||
| RegisterParentPathDependencies(context, parentPath); | ||
|
|
||
| AdvanceTransactionStateToPropose(context, db); | ||
|
|
||
| PersistExternalDataSource( | ||
| context, db, externalDataSource, externalDataSourceInfo, acl); | ||
|
|
||
| IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, | ||
| dstPath, | ||
| context.SS, | ||
| context.OnComplete); | ||
|
|
||
| SetState(NextState()); | ||
| return result; | ||
| } | ||
|
|
||
| void AbortPropose(TOperationContext& context) override { | ||
| LOG_N("TAlterExternalDataSource AbortPropose" | ||
| << ": opId# " << OperationId); | ||
| Y_ABORT("no AbortPropose for TAlterExternalDataSource"); | ||
| } | ||
|
|
||
| void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { | ||
| LOG_N("TAlterExternalDataSource AbortUnsafe" | ||
| << ": opId# " << OperationId << ", txId# " << forceDropTxId); | ||
| context.OnComplete.DoneOperation(OperationId); | ||
| } | ||
| }; | ||
|
|
||
| } // namespace | ||
|
|
||
| namespace NKikimr::NSchemeShard { | ||
|
|
||
| ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, const TTxTransaction& tx) { | ||
| return MakeSubOperation<TAlterExternalDataSource>(id, tx); | ||
| } | ||
|
|
||
| ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, TTxState::ETxState state) { | ||
| Y_ABORT_UNLESS(state != TTxState::Invalid); | ||
| return MakeSubOperation<TAlterExternalDataSource>(id, state); | ||
| } | ||
|
|
||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.