Skip to content

Commit ee5c320

Browse files
authored
Merge 220c1c6 into 57ae3a3
2 parents 57ae3a3 + 220c1c6 commit ee5c320

File tree

4 files changed

+21
-0
lines changed

4 files changed

+21
-0
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
169169
if (!commitTx)
170170
return true;
171171

172+
if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) {
173+
return true;
174+
}
175+
172176
size_t readPhases = 0;
173177
bool hasEffects = false;
174178
bool hasSourceRead = false;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
952952
if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) {
953953
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
954954
}
955+
if (GetSnapshot().IsValid() && settings.GetIsOlap()) {
956+
settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
957+
settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);
958+
}
955959
output.SinkSettings.ConstructInPlace();
956960
output.SinkSettings->PackFrom(settings);
957961
} else {

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,14 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
170170
const bool inconsistentTx,
171171
const NMiniKQL::TTypeEnvironment& typeEnv,
172172
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
173+
const std::optional<NKikimrDataEvents::TMvccSnapshot>& mvccSnapshot,
173174
const IKqpTransactionManagerPtr& txManager,
174175
const TActorId sessionActorId,
175176
TIntrusivePtr<TKqpCounters> counters,
176177
NWilson::TTraceId traceId)
177178
: TypeEnv(typeEnv)
178179
, Alloc(alloc)
180+
, MvccSnapshot(mvccSnapshot)
179181
, TableId(tableId)
180182
, TablePath(tablePath)
181183
, LockTxId(lockTxId)
@@ -812,6 +814,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
812814
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
813815
} else if (!InconsistentTx) {
814816
evWrite->SetLockId(LockTxId, LockNodeId);
817+
if (MvccSnapshot) {
818+
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
819+
}
815820
}
816821

817822
const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite);
@@ -953,6 +958,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
953958
const NMiniKQL::TTypeEnvironment& TypeEnv;
954959
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
955960

961+
std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot;
962+
956963
const TTableId TableId;
957964
const TString TablePath;
958965

@@ -1018,6 +1025,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
10181025
Settings.GetInconsistentTx(),
10191026
TypeEnv,
10201027
Alloc,
1028+
Settings.GetIsOlap()
1029+
? std::optional<NKikimrDataEvents::TMvccSnapshot>{Settings.GetMvccSnapshot()}
1030+
: std::optional<NKikimrDataEvents::TMvccSnapshot>{},
10211031
nullptr,
10221032
TActorId{},
10231033
Counters,
@@ -1205,6 +1215,7 @@ struct TTransactionSettings {
12051215
ui64 LockTxId = 0;
12061216
ui64 LockNodeId = 0;
12071217
bool InconsistentTx = false;
1218+
NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
12081219
};
12091220

12101221
struct TWriteSettings {
@@ -1333,6 +1344,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
13331344
InconsistentTx,
13341345
TypeEnv,
13351346
Alloc,
1347+
std::nullopt,
13361348
TxManager,
13371349
SessionActorId,
13381350
Counters,

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ message TKqpTableSinkSettings {
719719
optional NActorsProto.TActorId BufferActorId = 10;
720720
optional int64 Priority = 11;
721721
optional bool IsOlap = 12;
722+
optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13;
722723
}
723724

724725
message TKqpStreamLookupSettings {

0 commit comments

Comments
 (0)