Skip to content

Fix broken txs #8714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
}

AFL_VERIFY(TxOperator->ProgressOnExecute(*Self, NOlap::TSnapshot(step, txId), txc));
Self->ProgressTxController->FinishPlannedTx(txId, txc);
Self->ProgressTxController->ProgressOnExecute(txId, txc);
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED);
}
Self->ProgressTxInFlight = std::nullopt;
Expand All @@ -84,7 +84,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
Self->RescheduleWaitingReads();
}
if (PlannedQueueItem) {
Self->GetProgressTxController().CompleteRunningTx(*PlannedQueueItem);
Self->GetProgressTxController().ProgressOnComplete(*PlannedQueueItem);
}
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
Expand Down
41 changes: 23 additions & 18 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
const auto txKind = record.GetTxKind();
const ui64 txId = record.GetTxId();
const auto& txBody = record.GetTxBody();
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);

if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
auto proposeResult = ProposeTtlDeprecated(txBody);
Expand All @@ -51,7 +52,7 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
Self->CurrentSchemeShardId = record.GetSchemeShardId();
Schema::SaveSpecialValue(db, Schema::EValueIds::CurrentSchemeShardId, Self->CurrentSchemeShardId);
} else {
Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId());
}
}
std::optional<TMessageSeqNo> msgSeqNo;
Expand Down Expand Up @@ -79,28 +80,32 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
AFL_VERIFY(!!TxOperator);
AFL_VERIFY(!!TxInfo);
const ui64 txId = record.GetTxId();
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("request_tx", TxInfo->DebugString())(
"this", (ui64)this)("op_tx", TxOperator->GetTxInfo().DebugString());
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())(
"request_tx", TxInfo->DebugString())("this", (ui64)this)("op_tx", TxOperator->GetTxInfo().DebugString());

Self->TryRegisterMediatorTimeCast();

if (TxOperator->IsFail()) {
TxOperator->SendReply(*Self, ctx);
return;
}
auto internalOp = Self->GetProgressTxController().GetTxOperatorOptional(txId);
if (!internalOp) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "removed tx operator");
}
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
if (!internalOp->CheckTxInfoForReply(*TxInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
return;
}

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "actual tx operator");
if (internalOp->IsAsync()) {
Self->GetProgressTxController().StartProposeOnComplete(*internalOp, ctx);
} else {
auto internalOp = Self->GetProgressTxController().GetTxOperatorVerified(TxOperator->GetTxId());
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
if (!TxOperator->CheckTxInfoForReply(*TxInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
return;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "actual tx operator");
}
if (TxOperator->IsAsync()) {
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
} else {
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
}
Self->GetProgressTxController().FinishProposeOnComplete(*internalOp, ctx);
}

Self->TryRegisterMediatorTimeCast();
}

TTxType GetTxType() const override {
Expand Down
31 changes: 31 additions & 0 deletions ydb/core/tx/columnshard/normalizer/tablet/broken_txs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "broken_txs.h"

#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

namespace NKikimr::NOlap {

TConclusion<std::vector<INormalizerTask::TPtr>> TBrokenTxsNormalizer::DoInit(
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);

using namespace NColumnShard;
auto rowset = db.Table<Schema::TxInfo>().GreaterOrEqual(0).Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("cannot read TxInfo");
}
while (!rowset.EndOfSet()) {
const ui64 txId = rowset.GetValue<Schema::TxInfo::TxId>();
if (!rowset.HaveValue<Schema::TxInfo::TxKind>()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tx_id", txId)("event", "removed_by_normalizer")("condition", "no_kind");
Schema::EraseTxInfo(db, txId);
}

if (!rowset.Next()) {
return TConclusionStatus::Fail("cannot read TxInfo");
}
}
return std::vector<INormalizerTask::TPtr>();
}

}
35 changes: 35 additions & 0 deletions ydb/core/tx/columnshard/normalizer/tablet/broken_txs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>


namespace NKikimr::NOlap {

class TBrokenTxsNormalizer: public TNormalizationController::INormalizerComponent {
public:
static TString GetClassNameStatic() {
return "BrokenTxsNormalizer";
}
private:
class TNormalizerResult;

static const inline INormalizerComponent::TFactory::TRegistrator<TBrokenTxsNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TBrokenTxsNormalizer>(GetClassNameStatic());

public:
TBrokenTxsNormalizer(const TNormalizationController::TInitContext&) {
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return {};
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/tablet/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
GLOBAL gc_counters.cpp
GLOBAL broken_txs.cpp
)

PEERDIR(
Expand Down
45 changes: 23 additions & 22 deletions ydb/core/tx/columnshard/transactions/tx_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,16 @@ std::optional<TTxController::TTxInfo> TTxController::PopFirstPlannedTx() {
return std::nullopt;
}

void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
void TTxController::ProgressOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
auto opIt = Operators.find(txId);
if (opIt != Operators.end()) {
Counters.OnFinishPlannedTx(opIt->second->GetOpType());
}
AFL_VERIFY(opIt != Operators.end())("tx_id", txId);
Counters.OnFinishPlannedTx(opIt->second->GetOpType());
AFL_VERIFY(Operators.erase(txId));
Schema::EraseTxInfo(db, txId);
}

void TTxController::CompleteRunningTx(const TPlanQueueItem& txItem) {
AFL_VERIFY(Operators.erase(txItem.TxId));
void TTxController::ProgressOnComplete(const TPlanQueueItem& txItem) {
AFL_VERIFY(RunningQueue.erase(txItem))("info", txItem.DebugString());
}

Expand Down Expand Up @@ -344,15 +343,12 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
}
}

void TTxController::StartProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txId);
if (auto txOperator = GetTxOperatorOptional(txId)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
txOperator->StartProposeOnComplete(Owner, ctx);
Counters.OnStartProposeOnComplete(txOperator->GetOpType());
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
}
void TTxController::StartProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txOperator.GetTxId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
txOperator.StartProposeOnComplete(Owner, ctx);
Counters.OnStartProposeOnComplete(txOperator.GetOpType());
}

void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
Expand All @@ -366,19 +362,24 @@ void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor:
}
}

void TTxController::FinishProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txOperator.GetTxId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start")("tx_info", txOperator.GetTxInfo().DebugString());
TTxController::TProposeResult proposeResult = txOperator.GetProposeStartInfoVerified();
AFL_VERIFY(!txOperator.IsFail());
txOperator.FinishProposeOnComplete(Owner, ctx);
txOperator.SendReply(Owner, ctx);
Counters.OnFinishProposeOnComplete(txOperator.GetOpType());
}

void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txId);
auto txOperator = GetTxOperatorOptional(txId);
if (!txOperator) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId);
return;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start")("tx_info", txOperator->GetTxInfo().DebugString());
TTxController::TProposeResult proposeResult = txOperator->GetProposeStartInfoVerified();
AFL_VERIFY(!txOperator->IsFail());
txOperator->FinishProposeOnComplete(Owner, ctx);
txOperator->SendReply(Owner, ctx);
Counters.OnFinishProposeOnComplete(txOperator->GetOpType());
return FinishProposeOnComplete(*txOperator, ctx);
}

void TTxController::ITransactionOperator::SwitchStateVerified(const EStatus from, const EStatus to) {
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,9 @@ class TTxController {

[[nodiscard]] std::shared_ptr<TTxController::ITransactionOperator> StartProposeOnExecute(
const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
void StartProposeOnComplete(const ui64 txId, const TActorContext& ctx);

void StartProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx);
void FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);

void FinishProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx);
void FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx);

void WriteTxOperatorInfo(NTabletFlatExecutor::TTransactionContext& txc, const ui64 txId, const TString& data) {
Expand All @@ -456,8 +455,8 @@ class TTxController {

std::optional<TTxInfo> GetFirstPlannedTx() const;
std::optional<TTxInfo> PopFirstPlannedTx();
void FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
void CompleteRunningTx(const TPlanQueueItem& tx);
void ProgressOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
void ProgressOnComplete(const TPlanQueueItem& tx);

std::optional<TPlanQueueItem> GetPlannedTx() const;
TPlanQueueItem GetFrontTx() const;
Expand Down
Loading