Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
if (!commitTx)
return true;

if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) {
return true;
}

size_t readPhases = 0;
bool hasEffects = false;
bool hasSourceRead = false;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) {
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
}
if (GetSnapshot().IsValid() && settings.GetIsOlap()) {
settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);
}
output.SinkSettings.ConstructInPlace();
output.SinkSettings->PackFrom(settings);
} else {
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const bool inconsistentTx,
const NMiniKQL::TTypeEnvironment& typeEnv,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const std::optional<NKikimrDataEvents::TMvccSnapshot>& mvccSnapshot,
const IKqpTransactionManagerPtr& txManager,
const TActorId sessionActorId,
TIntrusivePtr<TKqpCounters> counters,
NWilson::TTraceId traceId)
: TypeEnv(typeEnv)
, Alloc(alloc)
, MvccSnapshot(mvccSnapshot)
, TableId(tableId)
, TablePath(tablePath)
, LockTxId(lockTxId)
Expand Down Expand Up @@ -813,6 +815,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
} else if (!InconsistentTx) {
evWrite->SetLockId(LockTxId, LockNodeId);
if (MvccSnapshot) {
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
}
}

const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite);
Expand Down Expand Up @@ -954,6 +959,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const NMiniKQL::TTypeEnvironment& TypeEnv;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;

std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot;

const TTableId TableId;
const TString TablePath;

Expand Down Expand Up @@ -1019,6 +1026,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
Settings.GetInconsistentTx(),
TypeEnv,
Alloc,
Settings.GetIsOlap()
? std::optional<NKikimrDataEvents::TMvccSnapshot>{Settings.GetMvccSnapshot()}
: std::optional<NKikimrDataEvents::TMvccSnapshot>{},
nullptr,
TActorId{},
Counters,
Expand Down Expand Up @@ -1206,6 +1216,7 @@ struct TTransactionSettings {
ui64 LockTxId = 0;
ui64 LockNodeId = 0;
bool InconsistentTx = false;
NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
};

struct TWriteSettings {
Expand Down Expand Up @@ -1333,6 +1344,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
InconsistentTx,
TypeEnv,
Alloc,
std::nullopt,
TxManager,
SessionActorId,
Counters,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ message TKqpTableSinkSettings {
optional NActorsProto.TActorId BufferActorId = 10;
optional int64 Priority = 11;
optional bool IsOlap = 12;
optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13;
}

message TKqpStreamLookupSettings {
Expand Down
Loading