Skip to content

Commit 4cccbf8

Browse files
prevent problem with same request in one transaction pack
don't apply actions for backup and sharing in case same request
1 parent 558c16c commit 4cccbf8

File tree

8 files changed

+50
-19
lines changed

8 files changed

+50
-19
lines changed

ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ std::unique_ptr<NKikimr::NTabletFlatExecutor::ITransaction> TSessionsManager::Tx
8787
return std::make_unique<TTxRemoveSession>(className, identifier, Adapter, Storage);
8888
}
8989

90+
bool TSessionsManager::HasTask(const TTask& task) const {
91+
return Storage->GetSession(task.GetDescriptionContainer().GetClassName(), task.GetIdentifier());
92+
}
93+
9094
}

ydb/core/tx/columnshard/bg_tasks/manager/manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class TSessionsManager {
3939
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> TxApplyControlFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& controlProto);
4040
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> TxApplyControl(const TSessionControlContainer& control);
4141

42+
bool HasTask(const TTask& task) const;
43+
4244
void Start() {
4345
AFL_VERIFY(!Finished);
4446
AFL_VERIFY(!Started);

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,17 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
6969
AFL_VERIFY(!!TxOperator);
7070
const ui64 txId = record.GetTxId();
7171
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);
72-
if (Ev->Sender != TxOperator->GetTxInfo().Source) {
73-
return;
74-
}
7572
if (TxOperator->IsFail()) {
7673
TxOperator->SendReply(*Self, ctx);
77-
} else if (TxOperator->IsAsync()) {
78-
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
7974
} else {
80-
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
75+
if (!Self->GetProgressTxController()->IsActualOperator(TxOperator)) {
76+
return;
77+
}
78+
if (TxOperator->IsAsync()) {
79+
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
80+
} else {
81+
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
82+
}
8183
}
8284

8385
Self->TryRegisterMediatorTimeCast();

ydb/core/tx/columnshard/transactions/operators/backup.cpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,32 @@ bool TBackupTransactionOperator::DoParse(TColumnShard& owner, const TString& dat
3131
NArrow::NSerialization::TSerializerContainer serializer(std::make_shared<NArrow::NSerialization::TNativeSerializer>());
3232
ExportTask = std::make_shared<NOlap::NExport::TExportTask>(id.DetachResult(), selector.DetachResult(), storeInitializer.DetachResult(), serializer, GetTxId());
3333
NOlap::NBackground::TTask task(::ToString(ExportTask->GetIdentifier().GetPathId()), std::make_shared<NOlap::NBackground::TFakeStatusChannel>(), ExportTask);
34-
TxAddTask = owner.GetBackgroundSessionsManager()->TxAddTask(task);
35-
if (!TxAddTask) {
36-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_add_task");
37-
return false;
34+
if (!owner.GetBackgroundSessionsManager()->HasTask(task)) {
35+
TxAddTask = owner.GetBackgroundSessionsManager()->TxAddTask(task);
36+
if (!TxAddTask) {
37+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_add_task");
38+
return false;
39+
}
40+
} else {
41+
TaskExists = true;
3842
}
3943
return true;
4044
}
4145

4246
TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) {
43-
AFL_VERIFY(!!TxAddTask);
44-
AFL_VERIFY(TxAddTask->Execute(txc, NActors::TActivationContext::AsActorContext()));
47+
if (!TaskExists) {
48+
AFL_VERIFY(!!TxAddTask);
49+
AFL_VERIFY(TxAddTask->Execute(txc, NActors::TActivationContext::AsActorContext()));
50+
}
4551
return TProposeResult();
4652
}
4753

4854
void TBackupTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) {
49-
AFL_VERIFY(!!TxAddTask);
50-
TxAddTask->Complete(ctx);
51-
TxAddTask.reset();
55+
if (!TaskExists) {
56+
AFL_VERIFY(!!TxAddTask);
57+
TxAddTask->Complete(ctx);
58+
TxAddTask.reset();
59+
}
5260
}
5361

5462
bool TBackupTransactionOperator::ExecuteOnProgress(TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {

ydb/core/tx/columnshard/transactions/operators/backup.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class TBackupTransactionOperator: public IProposeTxOperator {
1111
using TBase = IProposeTxOperator;
1212

1313
std::shared_ptr<NOlap::NExport::TExportTask> ExportTask;
14+
bool TaskExists = false;
1415
std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAddTask;
1516
std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;
1617
std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAbort;

ydb/core/tx/columnshard/transactions/operators/sharing.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ bool TSharingTransactionOperator::DoParse(TColumnShard& owner, const TString& da
2323

2424
auto currentSession = SharingSessionsManager->GetDestinationSession(SharingTask->GetSessionId());
2525
if (currentSession) {
26+
SessionExistsFlag = true;
2627
SharingTask = currentSession;
2728
} else {
2829
SharingTask->Confirm();
@@ -34,14 +35,18 @@ bool TSharingTransactionOperator::DoParse(TColumnShard& owner, const TString& da
3435
}
3536

3637
TSharingTransactionOperator::TProposeResult TSharingTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) {
37-
AFL_VERIFY(!!TxPropose);
38-
AFL_VERIFY(TxPropose->Execute(txc, NActors::TActivationContext::AsActorContext()));
38+
if (!SessionExistsFlag) {
39+
AFL_VERIFY(!!TxPropose);
40+
AFL_VERIFY(TxPropose->Execute(txc, NActors::TActivationContext::AsActorContext()));
41+
}
3942
return TProposeResult();
4043
}
4144

4245
void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) {
43-
AFL_VERIFY(!!TxPropose);
44-
TxPropose->Complete(ctx);
46+
if (!SessionExistsFlag) {
47+
AFL_VERIFY(!!TxPropose);
48+
TxPropose->Complete(ctx);
49+
}
4550
TxPropose.release();
4651
}
4752

ydb/core/tx/columnshard/transactions/operators/sharing.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class TSharingTransactionOperator: public IProposeTxOperator {
1212

1313
std::shared_ptr<NOlap::NDataSharing::TSessionsManager> SharingSessionsManager;
1414
std::shared_ptr<NOlap::NDataSharing::TDestinationSession> SharingTask;
15+
bool SessionExistsFlag = false;
1516
using TProposeResult = TTxController::TProposeResult;
1617
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxPropose;
1718
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;

ydb/core/tx/columnshard/transactions/tx_controller.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,14 @@ class TTxController {
332332
ui64 GetMemoryUsage() const;
333333
bool HaveOutdatedTxs() const;
334334

335+
bool IsActualOperator(const std::shared_ptr<TTxController::ITransactionOperator>& op) const {
336+
auto opActual = GetTxOperator(op->GetTxId());
337+
if (!opActual || (ui64)opActual.get() != (ui64)op.get()) {
338+
return false;
339+
}
340+
return true;
341+
}
342+
335343
bool Load(NTabletFlatExecutor::TTransactionContext& txc);
336344

337345
[[nodiscard]] std::shared_ptr<TTxController::ITransactionOperator> UpdateTxSourceInfo(const TFullTxInfo& tx, NTabletFlatExecutor::TTransactionContext& txc);

0 commit comments

Comments
 (0)