Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace NKikimr::NColumnShard {

class TColumnShard;

class TTxBlobsWritingFinished: public TExtendedTransactionBase<TColumnShard> {
class TTxBlobsWritingFinished: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
std::vector<TInsertedPortions> Packs;
const std::shared_ptr<NOlap::IBlobsWritingAction> WritingActions;
std::optional<NOlap::TSnapshot> CommitSnapshot;
Expand Down Expand Up @@ -51,9 +51,9 @@ class TTxBlobsWritingFinished: public TExtendedTransactionBase<TColumnShard> {
}
};

class TTxBlobsWritingFailed: public TExtendedTransactionBase<TColumnShard> {
class TTxBlobsWritingFailed: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const NKikimrProto::EReplyStatus PutBlobResult;
std::vector<TInsertedPortions> Packs;

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NColumnShard {

class TTxWrite: public TExtendedTransactionBase<TColumnShard> {
class TTxWrite: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;

public:
TTxWrite(TColumnShard* self, const TEvPrivate::TEvWriteBlobsResult::TPtr& putBlobResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxDataFromSource: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxDataFromSource: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TDestinationSession> Session;
THashMap<ui64, NEvents::TPathIdData> PortionsByPathId;
THashMap<TString, THashSet<NBlobCache::TUnifiedBlobId>> SharedBlobIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxFinishAckFromInitiator: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxFinishAckFromInitiator: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TDestinationSession> Session;
protected:
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxFinishFromSource: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxFinishFromSource: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TDestinationSession> Session;
const TTabletId SourceTabletId;
bool Finished = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxProposeFromInitiator: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxProposeFromInitiator: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TDestinationSession> Session;
THashMap<TString, std::shared_ptr<TDestinationSession>>* Sessions;
protected:
Expand All @@ -23,9 +23,9 @@ class TTxProposeFromInitiator: public NColumnShard::TExtendedTransactionBase<NCo
TTxType GetTxType() const override { return NColumnShard::TXTYPE_DATA_SHARING_PROPOSE_FROM_INITIATOR; }
};

class TTxConfirmFromInitiator: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxConfirmFromInitiator: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TDestinationSession> Session;
protected:
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace NKikimr::NOlap::NDataSharing {

class TTaskForTablet;

class TTxApplyLinksModification: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxApplyLinksModification: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TTaskForTablet> Task;
const TTabletId InitiatorTabletId;
const TString SessionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxDataAckToSource: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxDataAckToSource: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TSourceSession> Session;
THashMap<TString, TTabletsByBlob> SharedBlobIds;
protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxFinishAckToSource: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxFinishAckToSource: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TSourceSession> Session;
protected:
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxStartSourceCursor: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxStartSourceCursor: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;

TSourceSession* Session;
THashMap<ui64, std::vector<TPortionDataAccessor>> Portions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxStartToSource: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxStartToSource: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TSourceSession> Session;
THashMap<TString, std::shared_ptr<TSourceSession>>* Sessions;
protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

namespace NKikimr::NOlap::NDataSharing {

class TTxWriteSourceCursor: public NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTxWriteSourceCursor: public NColumnShard::TExtendedTransactionBase {
private:
using TBase = NColumnShard::TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = NColumnShard::TExtendedTransactionBase;
std::shared_ptr<TSourceSession> Session;
protected:
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/inflight_request_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ void TInFlightReadsTracker::AddToInFlightRequest(
}

namespace {
class TTransactionSavePersistentSnapshots: public TExtendedTransactionBase<NColumnShard::TColumnShard> {
class TTransactionSavePersistentSnapshots: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<NColumnShard::TColumnShard>;
using TBase = TExtendedTransactionBase;
const std::set<NOlap::TSnapshot> SaveSnapshots;
const std::set<NOlap::TSnapshot> RemoveSnapshots;
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override {
Expand Down
25 changes: 22 additions & 3 deletions ydb/core/tx/columnshard/tablet/ext_tx_base.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
#include "tx_extension.h"
#include "ext_tx_base.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>

namespace NKikimr::NColumnShard::NDataSharing {

}
namespace NKikimr::NColumnShard {

bool TExtendedTransactionBase::Execute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("local_tx_no", TabletTxNo)("method", "execute")("tx_info", TxInfo);
return DoExecute(txc, ctx);
}
void TExtendedTransactionBase::Complete(const NActors::TActorContext& ctx) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("local_tx_no", TabletTxNo)("method", "complete")("tx_info", TxInfo);
DoComplete(ctx);
}

TExtendedTransactionBase::TExtendedTransactionBase(TColumnShard* self, const TString& txInfo)
: TBase(self)
, TxInfo(txInfo)
, TabletTxNo(++Self->TabletTxCounter)
{

}

} //namespace NKikimr::NColumnShard
25 changes: 7 additions & 18 deletions ydb/core/tx/columnshard/tablet/ext_tx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,22 @@

namespace NKikimr::NColumnShard {

class TColumnShard;

//Base class for LocalDB transactions with ColumnShard specific
template <class TShard>
class TExtendedTransactionBase: public NTabletFlatExecutor::TTransactionBase<TShard> {
class TExtendedTransactionBase: public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
private:
const TString TxInfo;
const ui32 TabletTxNo;
using TBase = NTabletFlatExecutor::TTransactionBase<TShard>;
using TBase = NTabletFlatExecutor::TTransactionBase<TColumnShard>;
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) = 0;
virtual void DoComplete(const NActors::TActorContext & ctx) = 0;

public:
virtual bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) override final {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build()("tablet_id", TBase::Self->TabletID())("local_tx_no", TabletTxNo)("tx_info", TxInfo);
return DoExecute(txc, ctx);
}
virtual void Complete(const NActors::TActorContext& ctx) override final {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build()("tablet_id", TBase::Self->TabletID())("local_tx_no", TabletTxNo)("tx_info", TxInfo);
return DoComplete(ctx);
}

TExtendedTransactionBase(TShard* self, const TString& txInfo = Default<TString>())
: TBase(self)
, TxInfo(txInfo)
, TabletTxNo(++TBase::Self->TabletTxCounter)
{
virtual bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) override final;
virtual void Complete(const NActors::TActorContext& ctx) override final;

}
TExtendedTransactionBase(TColumnShard* self, const TString& txInfo = Default<TString>());
};

} //namespace NKikimr::NColumnShard
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/tablet/ya.make
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
LIBRARY()

SRCS(
ext_tx_base.h
ext_tx_base.cpp
write_queue.cpp
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
virtual TString DoDebugString() const override {
return "EV_WRITE_PRIMARY";
}
class TTxWriteReceivedBrokenFlag: public TExtendedTransactionBase<TColumnShard> {
class TTxWriteReceivedBrokenFlag: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const ui64 TxId;
const ui64 TabletId;
const bool BrokenFlag;
Expand Down Expand Up @@ -126,9 +126,9 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
return std::make_unique<TTxWriteReceivedBrokenFlag>(owner, GetTxId(), sendTabletId, broken);
}

class TTxWriteReceivedResultAck: public TExtendedTransactionBase<TColumnShard> {
class TTxWriteReceivedResultAck: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const ui64 TxId;
const ui64 TabletId;

Expand Down Expand Up @@ -222,9 +222,9 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
CheckFinished(owner);
}

class TTxStartPreparation: public TExtendedTransactionBase<TColumnShard> {
class TTxStartPreparation: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const ui64 TxId;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
virtual TString DoDebugString() const override {
return "EV_WRITE_SECONDARY";
}
class TTxWriteReceivedAck: public TExtendedTransactionBase<TColumnShard> {
class TTxWriteReceivedAck: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const ui64 TxId;
bool NeedContinueFlag = false;

Expand Down Expand Up @@ -99,9 +99,9 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
return std::make_unique<TTxWriteReceivedAck>(owner, GetTxId());
}

class TTxWriteReceivedBrokenFlag: public TExtendedTransactionBase<TColumnShard> {
class TTxWriteReceivedBrokenFlag: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const ui64 TxId;
const bool BrokenFlag;

Expand Down Expand Up @@ -170,9 +170,9 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
}
}

class TTxStartPreparation: public TExtendedTransactionBase<TColumnShard> {
class TTxStartPreparation: public TExtendedTransactionBase {
private:
using TBase = TExtendedTransactionBase<TColumnShard>;
using TBase = TExtendedTransactionBase;
const ui64 TxId;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
Expand Down
Loading