Skip to content

Commit b524a82

Browse files
timeout for long_tx operations (#7961)
1 parent 42003b3 commit b524a82

File tree

3 files changed

+39
-13
lines changed

3 files changed

+39
-13
lines changed

ydb/core/tx/columnshard/columnshard__progress_tx.cpp

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,39 @@
22
#include "columnshard_schema.h"
33

44
#include <ydb/core/tx/columnshard/operations/write.h>
5+
56
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
67

78
namespace NKikimr::NColumnShard {
89

9-
class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
10+
class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
11+
private:
12+
bool AbortedThroughRemoveExpired = false;
13+
TTxController::ITransactionOperator::TPtr TxOperator;
14+
const ui32 TabletTxNo;
15+
std::optional<NOlap::TSnapshot> LastCompletedTx;
16+
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
17+
1018
public:
1119
TTxProgressTx(TColumnShard* self)
1220
: TTransactionBase(self)
13-
, TabletTxNo(++Self->TabletTxCounter)
14-
{}
21+
, TabletTxNo(++Self->TabletTxCounter) {
22+
}
1523

16-
TTxType GetTxType() const override { return TXTYPE_PROGRESS; }
24+
TTxType GetTxType() const override {
25+
return TXTYPE_PROGRESS;
26+
}
1727

1828
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
19-
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
29+
NActors::TLogContextGuard logGuard =
30+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
2031
Y_ABORT_UNLESS(Self->ProgressTxInFlight);
2132
Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds());
2233

2334
const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc);
2435
if (removedCount > 0) {
2536
// We cannot continue with this transaction, start a new transaction
37+
AbortedThroughRemoveExpired = true;
2638
Self->Execute(new TTxProgressTx(Self), ctx);
2739
return true;
2840
}
@@ -49,7 +61,11 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
4961
}
5062

5163
void Complete(const TActorContext& ctx) override {
52-
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
64+
if (AbortedThroughRemoveExpired) {
65+
return;
66+
}
67+
NActors::TLogContextGuard logGuard =
68+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
5369
if (TxOperator) {
5470
TxOperator->ProgressOnComplete(*Self, ctx);
5571
Self->RescheduleWaitingReads();
@@ -66,12 +82,6 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
6682
}
6783
Self->SetupIndexation();
6884
}
69-
70-
private:
71-
TTxController::ITransactionOperator::TPtr TxOperator;
72-
const ui32 TabletTxNo;
73-
std::optional<NOlap::TSnapshot> LastCompletedTx;
74-
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
7585
};
7686

7787
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
@@ -102,4 +112,4 @@ void TColumnShard::Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev,
102112
// For now do not return result for not finished tx. It would be sent in TTxProgressTx::Complete()
103113
}
104114

105-
}
115+
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/transactions/operators/long_tx_write.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ namespace NKikimr::NColumnShard {
1616
return "LONG_TX_WRITE";
1717
}
1818

19+
bool TxWithDeadline() const override {
20+
return true;
21+
}
22+
1923
virtual TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;
2024
virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
2125

ydb/core/tx/columnshard/transactions/tx_controller.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
4646
return false;
4747
}
4848

49+
ui32 countWithDeadline = 0;
50+
ui32 countOverrideDeadline = 0;
51+
ui32 countNoDeadline = 0;
4952
while (!rowset.EndOfSet()) {
5053
const ui64 txId = rowset.GetValue<Schema::TxInfo::TxId>();
5154
const NKikimrTxColumnShard::ETransactionKind txKind = rowset.GetValue<Schema::TxInfo::TxKind>();
@@ -58,6 +61,13 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
5861
txInfo.MaxStep = rowset.GetValue<Schema::TxInfo::MaxStep>();
5962
if (txInfo.MaxStep != Max<ui64>()) {
6063
txInfo.MinStep = txInfo.MaxStep - MaxCommitTxDelay.MilliSeconds();
64+
++countWithDeadline;
65+
} else if (txOperator->TxWithDeadline()) {
66+
txInfo.MinStep = GetAllowedStep();
67+
txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds();
68+
++countOverrideDeadline;
69+
} else {
70+
++countNoDeadline;
6171
}
6272
txInfo.PlanStep = rowset.GetValueOrDefault<Schema::TxInfo::PlanStep>(0);
6373
txInfo.Source = rowset.GetValue<Schema::TxInfo::Source>();
@@ -75,6 +85,8 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
7585
return false;
7686
}
7787
}
88+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)(
89+
"operators", Operators.size())("plan", PlanQueue.size())("dl_queue", DeadlineQueue.size());
7890
return true;
7991
}
8092

0 commit comments

Comments
 (0)