Skip to content

tx operations start unification #7178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}

void TTxInit::Complete(const TActorContext& ctx) {
Self->ProgressTxController->StartOperators();
Self->ProgressTxController->OnTabletInit();
Self->SwitchToWork(ctx);
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace NKikimr::NColumnShard {
public:
using TBase::TBase;

void OnTabletInit(TColumnShard& owner) override {
virtual void DoOnTabletInit(TColumnShard& owner) override {
for (auto&& writeId : WriteIds) {
AFL_VERIFY(owner.LongTxWrites.contains(writeId))("problem", "ltx_not_exists_for_write_id")("txId", GetTxId())("writeId", (ui64)writeId);
owner.AddLongTxWrite(writeId, GetTxId());
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/transactions/operators/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ NKikimr::TConclusionStatus TSchemaTransactionOperator::ValidateTables(::google::
} return TConclusionStatus::Success();
}

bool TSchemaTransactionOperator::DoOnStartAsync(TColumnShard& owner) {
void TSchemaTransactionOperator::DoOnTabletInit(TColumnShard& owner) {
AFL_VERIFY(WaitPathIdsToErase.empty());
switch (SchemaTxBody.TxBody_case()) {
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
Expand All @@ -190,11 +190,9 @@ bool TSchemaTransactionOperator::DoOnStartAsync(TColumnShard& owner) {
if (WaitPathIdsToErase.size()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "wait_remove_path_id")("pathes", JoinSeq(",", WaitPathIdsToErase))("tx_id", GetTxId());
owner.Subscribers->RegisterSubscriber(std::make_shared<TWaitEraseTablesTxSubscriber>(WaitPathIdsToErase, GetTxId()));
return true;
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "remove_pathes_cleaned")("tx_id", GetTxId());
owner.Execute(new TTxFinishAsyncTransaction(owner, GetTxId()));
return false;
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/transactions/operators/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TSchemaTransactionOperator: public IProposeTxOperator {
THashSet<TActorId> NotifySubscribers;
THashSet<ui64> WaitPathIdsToErase;

virtual bool DoOnStartAsync(TColumnShard& owner) override;
virtual void DoOnTabletInit(TColumnShard& owner) override;

template <class TInfoProto>
THashSet<ui64> GetNotErasedTableIds(const TColumnShard& owner, const TInfoProto& tables) const {
Expand Down
24 changes: 8 additions & 16 deletions ydb/core/tx/columnshard/transactions/tx_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,17 @@ TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const std::shared_p
return txInfo;
}

bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
auto opIt = Operators.find(txId);
bool TTxController::AbortTx(const TPlanQueueItem planQueueItem, NTabletFlatExecutor::TTransactionContext& txc) {
auto opIt = Operators.find(planQueueItem.TxId);
Y_ABORT_UNLESS(opIt != Operators.end());
Y_ABORT_UNLESS(opIt->second->GetTxInfo().PlanStep == 0);
opIt->second->ExecuteOnAbort(Owner, txc);
opIt->second->CompleteOnAbort(Owner, NActors::TActivationContext::AsActorContext());

if (opIt->second->GetTxInfo().MaxStep != Max<ui64>()) {
DeadlineQueue.erase(TPlanQueueItem(opIt->second->GetTxInfo().MaxStep, txId));
}
Operators.erase(txId);
AFL_VERIFY(Operators.erase(planQueueItem.TxId));
AFL_VERIFY(DeadlineQueue.erase(planQueueItem));
NIceDb::TNiceDb db(txc.DB);
Schema::EraseTxInfo(db, txId);
Schema::EraseTxInfo(db, planQueueItem.TxId);
return true;
}

Expand Down Expand Up @@ -240,7 +238,7 @@ size_t TTxController::CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext&
}
ui64 txId = it->TxId;
LOG_S_DEBUG(TStringBuilder() << "Removing outdated txId " << txId << " max step " << it->Step << " outdated step ");
AbortTx(txId, txc);
AbortTx(*it, txc);
++removedCount;
}
}
Expand Down Expand Up @@ -280,6 +278,8 @@ TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64
}

void TTxController::OnTabletInit() {
AFL_VERIFY(!StartedFlag);
StartedFlag = true;
for (auto&& txOperator : Operators) {
txOperator.second->OnTabletInit(Owner);
}
Expand Down Expand Up @@ -361,14 +361,6 @@ void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext
txOperator->SendReply(Owner, ctx);
}

void TTxController::StartOperators() {
AFL_VERIFY(!StartedFlag);
StartedFlag = true;
for (auto&& i : Operators) {
Y_UNUSED(i.second->OnStartAsync(Owner));
}
}

void TTxController::ITransactionOperator::SwitchStateVerified(const EStatus from, const EStatus to) {
AFL_VERIFY(!Status || *Status == from)("error", "incorrect expected status")("real_state", *Status)("expected", from)("details", DebugString());
Status = to;
Expand Down
22 changes: 10 additions & 12 deletions ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,15 @@ class TTxController {
return TxInfo;
}

virtual void DoOnTabletInit(TColumnShard& /*owner*/) {

}

void ResetStatusOnUpdate() {
Status = {};
}

virtual TString DoDebugString() const = 0;
virtual bool DoOnStartAsync(TColumnShard& /*owner*/) {
return false;
}

std::optional<bool> StartedAsync;

Expand All @@ -222,12 +223,6 @@ class TTxController {
return DoCheckTxInfoForReply(originalTxInfo);
}

[[nodiscard]] bool OnStartAsync(TColumnShard& owner) {
AFL_VERIFY(!StartedAsync);
StartedAsync = DoOnStartAsync(owner);
return *StartedAsync;
}

TString DebugString() const {
return DoDebugString();
}
Expand Down Expand Up @@ -345,7 +340,11 @@ class TTxController {
virtual void RegisterSubscriber(const TActorId&) {
AFL_VERIFY(false)("message", "Not implemented");
};
virtual void OnTabletInit(TColumnShard& /*owner*/) {}
void OnTabletInit(TColumnShard& owner) {
AFL_VERIFY(!StartedAsync);
StartedAsync = true;
DoOnTabletInit(owner);
}
};

private:
Expand All @@ -359,7 +358,7 @@ class TTxController {

private:
ui64 GetAllowedStep() const;
bool AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
bool AbortTx(const TPlanQueueItem planQueueItem, NTabletFlatExecutor::TTransactionContext& txc);

TTxInfo RegisterTx(const std::shared_ptr<TTxController::ITransactionOperator>& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
TTxInfo RegisterTxWithDeadline(const std::shared_ptr<TTxController::ITransactionOperator>& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
Expand All @@ -369,7 +368,6 @@ class TTxController {

ITransactionOperator::TPtr GetTxOperator(const ui64 txId) const;
ITransactionOperator::TPtr GetVerifiedTxOperator(const ui64 txId) const;
void StartOperators();

ui64 GetMemoryUsage() const;
bool HaveOutdatedTxs() const;
Expand Down
Loading