Skip to content

Commit fd5e409

Browse files
Merge 4041946 into 96140d5
2 parents 96140d5 + 4041946 commit fd5e409

File tree

6 files changed

+21
-34
lines changed

6 files changed

+21
-34
lines changed

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
250250
}
251251

252252
void TTxInit::Complete(const TActorContext& ctx) {
253-
Self->ProgressTxController->StartOperators();
254253
Self->ProgressTxController->OnTabletInit();
255254
Self->SwitchToWork(ctx);
256255
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);

ydb/core/tx/columnshard/transactions/operators/long_tx_write.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace NKikimr::NColumnShard {
3737
public:
3838
using TBase::TBase;
3939

40-
void OnTabletInit(TColumnShard& owner) override {
40+
virtual void DoOnTabletInit(TColumnShard& owner) override {
4141
for (auto&& writeId : WriteIds) {
4242
AFL_VERIFY(owner.LongTxWrites.contains(writeId))("problem", "ltx_not_exists_for_write_id")("txId", GetTxId())("writeId", (ui64)writeId);
4343
owner.AddLongTxWrite(writeId, GetTxId());

ydb/core/tx/columnshard/transactions/operators/schema.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ NKikimr::TConclusionStatus TSchemaTransactionOperator::ValidateTables(::google::
166166
} return TConclusionStatus::Success();
167167
}
168168

169-
bool TSchemaTransactionOperator::DoOnStartAsync(TColumnShard& owner) {
169+
void TSchemaTransactionOperator::DoOnTabletInit(TColumnShard& owner) {
170170
AFL_VERIFY(WaitPathIdsToErase.empty());
171171
switch (SchemaTxBody.TxBody_case()) {
172172
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
@@ -190,11 +190,9 @@ bool TSchemaTransactionOperator::DoOnStartAsync(TColumnShard& owner) {
190190
if (WaitPathIdsToErase.size()) {
191191
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "wait_remove_path_id")("pathes", JoinSeq(",", WaitPathIdsToErase))("tx_id", GetTxId());
192192
owner.Subscribers->RegisterSubscriber(std::make_shared<TWaitEraseTablesTxSubscriber>(WaitPathIdsToErase, GetTxId()));
193-
return true;
194193
} else {
195194
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "remove_pathes_cleaned")("tx_id", GetTxId());
196195
owner.Execute(new TTxFinishAsyncTransaction(owner, GetTxId()));
197-
return false;
198196
}
199197
}
200198

ydb/core/tx/columnshard/transactions/operators/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class TSchemaTransactionOperator: public IProposeTxOperator {
1818
THashSet<TActorId> NotifySubscribers;
1919
THashSet<ui64> WaitPathIdsToErase;
2020

21-
virtual bool DoOnStartAsync(TColumnShard& owner) override;
21+
virtual void DoOnTabletInit(TColumnShard& owner) override;
2222

2323
template <class TInfoProto>
2424
THashSet<ui64> GetNotErasedTableIds(const TColumnShard& owner, const TInfoProto& tables) const {

ydb/core/tx/columnshard/transactions/tx_controller.cpp

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,17 @@ TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const std::shared_p
128128
return txInfo;
129129
}
130130

131-
bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
132-
auto opIt = Operators.find(txId);
131+
bool TTxController::AbortTx(const TPlanQueueItem planQueueItem, NTabletFlatExecutor::TTransactionContext& txc) {
132+
auto opIt = Operators.find(planQueueItem.TxId);
133133
Y_ABORT_UNLESS(opIt != Operators.end());
134134
Y_ABORT_UNLESS(opIt->second->GetTxInfo().PlanStep == 0);
135135
opIt->second->ExecuteOnAbort(Owner, txc);
136136
opIt->second->CompleteOnAbort(Owner, NActors::TActivationContext::AsActorContext());
137137

138-
if (opIt->second->GetTxInfo().MaxStep != Max<ui64>()) {
139-
DeadlineQueue.erase(TPlanQueueItem(opIt->second->GetTxInfo().MaxStep, txId));
140-
}
141-
Operators.erase(txId);
138+
AFL_VERIFY(Operators.erase(planQueueItem.TxId));
139+
AFL_VERIFY(DeadlineQueue.erase(planQueueItem));
142140
NIceDb::TNiceDb db(txc.DB);
143-
Schema::EraseTxInfo(db, txId);
141+
Schema::EraseTxInfo(db, planQueueItem.TxId);
144142
return true;
145143
}
146144

@@ -240,7 +238,7 @@ size_t TTxController::CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext&
240238
}
241239
ui64 txId = it->TxId;
242240
LOG_S_DEBUG(TStringBuilder() << "Removing outdated txId " << txId << " max step " << it->Step << " outdated step ");
243-
AbortTx(txId, txc);
241+
AbortTx(*it, txc);
244242
++removedCount;
245243
}
246244
}
@@ -280,6 +278,8 @@ TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64
280278
}
281279

282280
void TTxController::OnTabletInit() {
281+
AFL_VERIFY(!StartedFlag);
282+
StartedFlag = true;
283283
for (auto&& txOperator : Operators) {
284284
txOperator.second->OnTabletInit(Owner);
285285
}
@@ -361,14 +361,6 @@ void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext
361361
txOperator->SendReply(Owner, ctx);
362362
}
363363

364-
void TTxController::StartOperators() {
365-
AFL_VERIFY(!StartedFlag);
366-
StartedFlag = true;
367-
for (auto&& i : Operators) {
368-
Y_UNUSED(i.second->OnStartAsync(Owner));
369-
}
370-
}
371-
372364
void TTxController::ITransactionOperator::SwitchStateVerified(const EStatus from, const EStatus to) {
373365
AFL_VERIFY(!Status || *Status == from)("error", "incorrect expected status")("real_state", *Status)("expected", from)("details", DebugString());
374366
Status = to;

ydb/core/tx/columnshard/transactions/tx_controller.h

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,15 @@ class TTxController {
203203
return TxInfo;
204204
}
205205

206+
virtual void DoOnTabletInit(TColumnShard& /*owner*/) {
207+
208+
}
209+
206210
void ResetStatusOnUpdate() {
207211
Status = {};
208212
}
209213

210214
virtual TString DoDebugString() const = 0;
211-
virtual bool DoOnStartAsync(TColumnShard& /*owner*/) {
212-
return false;
213-
}
214215

215216
std::optional<bool> StartedAsync;
216217

@@ -222,12 +223,6 @@ class TTxController {
222223
return DoCheckTxInfoForReply(originalTxInfo);
223224
}
224225

225-
[[nodiscard]] bool OnStartAsync(TColumnShard& owner) {
226-
AFL_VERIFY(!StartedAsync);
227-
StartedAsync = DoOnStartAsync(owner);
228-
return *StartedAsync;
229-
}
230-
231226
TString DebugString() const {
232227
return DoDebugString();
233228
}
@@ -345,7 +340,11 @@ class TTxController {
345340
virtual void RegisterSubscriber(const TActorId&) {
346341
AFL_VERIFY(false)("message", "Not implemented");
347342
};
348-
virtual void OnTabletInit(TColumnShard& /*owner*/) {}
343+
void OnTabletInit(TColumnShard& owner) {
344+
AFL_VERIFY(!StartedAsync);
345+
StartedAsync = true;
346+
DoOnTabletInit(owner);
347+
}
349348
};
350349

351350
private:
@@ -359,7 +358,7 @@ class TTxController {
359358

360359
private:
361360
ui64 GetAllowedStep() const;
362-
bool AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
361+
bool AbortTx(const TPlanQueueItem planQueueItem, NTabletFlatExecutor::TTransactionContext& txc);
363362

364363
TTxInfo RegisterTx(const std::shared_ptr<TTxController::ITransactionOperator>& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
365364
TTxInfo RegisterTxWithDeadline(const std::shared_ptr<TTxController::ITransactionOperator>& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
@@ -369,7 +368,6 @@ class TTxController {
369368

370369
ITransactionOperator::TPtr GetTxOperator(const ui64 txId) const;
371370
ITransactionOperator::TPtr GetVerifiedTxOperator(const ui64 txId) const;
372-
void StartOperators();
373371

374372
ui64 GetMemoryUsage() const;
375373
bool HaveOutdatedTxs() const;

0 commit comments

Comments
 (0)