Skip to content

Commit a0bfcd6

Browse files
Merge 4cccbf8 into d5f8f4e
2 parents d5f8f4e + 4cccbf8 commit a0bfcd6

File tree

12 files changed

+77
-20
lines changed

12 files changed

+77
-20
lines changed

ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ std::unique_ptr<NKikimr::NTabletFlatExecutor::ITransaction> TSessionsManager::Tx
8787
return std::make_unique<TTxRemoveSession>(className, identifier, Adapter, Storage);
8888
}
8989

90+
bool TSessionsManager::HasTask(const TTask& task) const {
91+
return Storage->GetSession(task.GetDescriptionContainer().GetClassName(), task.GetIdentifier());
92+
}
93+
9094
}

ydb/core/tx/columnshard/bg_tasks/manager/manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class TSessionsManager {
3939
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> TxApplyControlFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& controlProto);
4040
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> TxApplyControl(const TSessionControlContainer& control);
4141

42+
bool HasTask(const TTask& task) const;
43+
4244
void Start() {
4345
AFL_VERIFY(!Finished);
4446
AFL_VERIFY(!Started);

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
2929
const auto txKind = record.GetTxKind();
3030
const ui64 txId = record.GetTxId();
3131
const auto& txBody = record.GetTxBody();
32+
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);
3233

3334
if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
3435
auto proposeResult = ProposeTtlDeprecated(txBody);
@@ -67,15 +68,18 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
6768
}
6869
AFL_VERIFY(!!TxOperator);
6970
const ui64 txId = record.GetTxId();
70-
if (Ev->Sender != TxOperator->GetTxInfo().Source) {
71-
return;
72-
}
71+
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);
7372
if (TxOperator->IsFail()) {
7473
TxOperator->SendReply(*Self, ctx);
75-
} else if (TxOperator->IsAsync()) {
76-
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
7774
} else {
78-
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
75+
if (!Self->GetProgressTxController()->IsActualOperator(TxOperator)) {
76+
return;
77+
}
78+
if (TxOperator->IsAsync()) {
79+
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
80+
} else {
81+
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
82+
}
7983
}
8084

8185
Self->TryRegisterMediatorTimeCast();

ydb/core/tx/columnshard/transactions/operators/backup.cpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,32 @@ bool TBackupTransactionOperator::DoParse(TColumnShard& owner, const TString& dat
3131
NArrow::NSerialization::TSerializerContainer serializer(std::make_shared<NArrow::NSerialization::TNativeSerializer>());
3232
ExportTask = std::make_shared<NOlap::NExport::TExportTask>(id.DetachResult(), selector.DetachResult(), storeInitializer.DetachResult(), serializer, GetTxId());
3333
NOlap::NBackground::TTask task(::ToString(ExportTask->GetIdentifier().GetPathId()), std::make_shared<NOlap::NBackground::TFakeStatusChannel>(), ExportTask);
34-
TxAddTask = owner.GetBackgroundSessionsManager()->TxAddTask(task);
35-
if (!TxAddTask) {
36-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_add_task");
37-
return false;
34+
if (!owner.GetBackgroundSessionsManager()->HasTask(task)) {
35+
TxAddTask = owner.GetBackgroundSessionsManager()->TxAddTask(task);
36+
if (!TxAddTask) {
37+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_add_task");
38+
return false;
39+
}
40+
} else {
41+
TaskExists = true;
3842
}
3943
return true;
4044
}
4145

4246
TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) {
43-
AFL_VERIFY(!!TxAddTask);
44-
AFL_VERIFY(TxAddTask->Execute(txc, NActors::TActivationContext::AsActorContext()));
47+
if (!TaskExists) {
48+
AFL_VERIFY(!!TxAddTask);
49+
AFL_VERIFY(TxAddTask->Execute(txc, NActors::TActivationContext::AsActorContext()));
50+
}
4551
return TProposeResult();
4652
}
4753

4854
void TBackupTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) {
49-
AFL_VERIFY(!!TxAddTask);
50-
TxAddTask->Complete(ctx);
51-
TxAddTask.reset();
55+
if (!TaskExists) {
56+
AFL_VERIFY(!!TxAddTask);
57+
TxAddTask->Complete(ctx);
58+
TxAddTask.reset();
59+
}
5260
}
5361

5462
bool TBackupTransactionOperator::ExecuteOnProgress(TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {

ydb/core/tx/columnshard/transactions/operators/backup.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class TBackupTransactionOperator: public IProposeTxOperator {
1111
using TBase = IProposeTxOperator;
1212

1313
std::shared_ptr<NOlap::NExport::TExportTask> ExportTask;
14+
bool TaskExists = false;
1415
std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAddTask;
1516
std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;
1617
std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAbort;
@@ -27,6 +28,10 @@ class TBackupTransactionOperator: public IProposeTxOperator {
2728
return true;
2829
}
2930
virtual bool DoParse(TColumnShard& owner, const TString& data) override;
31+
virtual TString DoDebugString() const override {
32+
return "BACKUP";
33+
}
34+
3035
public:
3136
using TBase::TBase;
3237

ydb/core/tx/columnshard/transactions/operators/ev_write.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ namespace NKikimr::NColumnShard {
2626
virtual bool DoCheckAllowUpdate(const TFullTxInfo& currentTxInfo) const override {
2727
return (currentTxInfo.Source == GetTxInfo().Source && currentTxInfo.Cookie == GetTxInfo().Cookie);
2828
}
29+
virtual TString DoDebugString() const override {
30+
return "EV_WRITE";
31+
}
2932
virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override {
3033
const auto& txInfo = GetTxInfo();
3134
std::unique_ptr<NActors::IEventBase> evResult;

ydb/core/tx/columnshard/transactions/operators/long_tx_write.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ namespace NKikimr::NColumnShard {
1212
static inline auto Registrator = TFactory::TRegistrator<TLongTxTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT);
1313

1414
private:
15+
virtual TString DoDebugString() const override {
16+
return "LONG_TX_WRITE";
17+
}
18+
1519
virtual TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;
1620
virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
1721

ydb/core/tx/columnshard/transactions/operators/schema.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ namespace NKikimr::NColumnShard {
101101
}
102102
return ValidateTableSchema(preset.GetSchema());
103103
}
104+
virtual TString DoDebugString() const override {
105+
return "SCHEME:" + SchemaTxBody.DebugString();
106+
}
104107

105108
private:
106109
NKikimrTxColumnShard::TSchemaTxBody SchemaTxBody;

ydb/core/tx/columnshard/transactions/operators/sharing.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ bool TSharingTransactionOperator::DoParse(TColumnShard& owner, const TString& da
2323

2424
auto currentSession = SharingSessionsManager->GetDestinationSession(SharingTask->GetSessionId());
2525
if (currentSession) {
26+
SessionExistsFlag = true;
2627
SharingTask = currentSession;
2728
} else {
2829
SharingTask->Confirm();
@@ -34,14 +35,18 @@ bool TSharingTransactionOperator::DoParse(TColumnShard& owner, const TString& da
3435
}
3536

3637
TSharingTransactionOperator::TProposeResult TSharingTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) {
37-
AFL_VERIFY(!!TxPropose);
38-
AFL_VERIFY(TxPropose->Execute(txc, NActors::TActivationContext::AsActorContext()));
38+
if (!SessionExistsFlag) {
39+
AFL_VERIFY(!!TxPropose);
40+
AFL_VERIFY(TxPropose->Execute(txc, NActors::TActivationContext::AsActorContext()));
41+
}
3942
return TProposeResult();
4043
}
4144

4245
void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) {
43-
AFL_VERIFY(!!TxPropose);
44-
TxPropose->Complete(ctx);
46+
if (!SessionExistsFlag) {
47+
AFL_VERIFY(!!TxPropose);
48+
TxPropose->Complete(ctx);
49+
}
4550
TxPropose.release();
4651
}
4752

ydb/core/tx/columnshard/transactions/operators/sharing.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class TSharingTransactionOperator: public IProposeTxOperator {
1212

1313
std::shared_ptr<NOlap::NDataSharing::TSessionsManager> SharingSessionsManager;
1414
std::shared_ptr<NOlap::NDataSharing::TDestinationSession> SharingTask;
15+
bool SessionExistsFlag = false;
1516
using TProposeResult = TTxController::TProposeResult;
1617
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxPropose;
1718
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;
@@ -28,6 +29,10 @@ class TSharingTransactionOperator: public IProposeTxOperator {
2829
return true;
2930
}
3031
virtual bool DoParse(TColumnShard& owner, const TString& data) override;
32+
virtual TString DoDebugString() const override {
33+
return "SHARING";
34+
}
35+
3136
public:
3237
using TBase::TBase;
3338
virtual void RegisterSubscriber(const TActorId& actorId) override {

0 commit comments

Comments
 (0)