Skip to content

Commit 4dce871

Browse files
authored
Merge 9334977 into 02d2031
2 parents 02d2031 + 9334977 commit 4dce871

File tree

7 files changed

+38
-12
lines changed

7 files changed

+38
-12
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
169169
if (!commitTx)
170170
return true;
171171

172+
if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) {
173+
return true;
174+
}
175+
172176
size_t readPhases = 0;
173177
bool hasEffects = false;
174178
bool hasSourceRead = false;

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,18 @@ class TKqpTransactionManager : public IKqpTransactionManager {
189189
return !HasOlapTable();
190190
}
191191

192-
bool HasSnapshot() const override {
193-
return ValidSnapshot;
192+
const std::optional<NKikimrDataEvents::TMvccSnapshot>& GetSnapshot() const override {
193+
return Snapshot;
194194
}
195195

196-
void SetHasSnapshot(bool hasSnapshot) override {
197-
ValidSnapshot = hasSnapshot;
196+
void SetSnapshot(ui64 step, ui64 txId) override {
197+
Snapshot.emplace();
198+
Snapshot->SetStep(step);
199+
Snapshot->SetTxId(txId);
198200
}
199201

200202
bool BrokenLocks() const override {
201-
return LocksIssue.has_value() && !(HasSnapshot() && IsReadOnly());
203+
return LocksIssue.has_value() && !(GetSnapshot() && IsReadOnly());
202204
}
203205

204206
const std::optional<NYql::TIssue>& GetLockIssue() const override {
@@ -405,8 +407,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
405407
std::unordered_set<TString> TablePathes;
406408

407409
bool ReadOnly = true;
408-
bool ValidSnapshot = false;
409410
bool HasOlapTableShard = false;
411+
std::optional<NKikimrDataEvents::TMvccSnapshot> Snapshot;
410412
std::optional<NYql::TIssue> LocksIssue;
411413

412414
THashSet<ui64> SendingShards;

ydb/core/kqp/common/kqp_tx_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ class IKqpTransactionManager {
6161

6262
virtual bool IsVolatile() const = 0;
6363

64-
virtual bool HasSnapshot() const = 0;
65-
virtual void SetHasSnapshot(bool hasSnapshot) = 0;
64+
virtual const std::optional<NKikimrDataEvents::TMvccSnapshot>& GetSnapshot() const = 0;
65+
virtual void SetSnapshot(ui64 step, ui64 txId) = 0;
6666

6767
virtual bool BrokenLocks() const = 0;
6868
virtual const std::optional<NYql::TIssue>& GetLockIssue() const = 0;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
282282
}
283283
}
284284

285-
if (TxManager) {
286-
TxManager->SetHasSnapshot(GetSnapshot().IsValid());
287-
}
288-
289285
if (!BufferActorId || (ReadOnlyTx && Request.LocksOp != ELocksOp::Rollback)) {
290286
Become(&TKqpDataExecuter::FinalizeState);
291287
MakeResponseAndPassAway();
@@ -2241,6 +2237,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22412237
ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterAcquireSnapshot, ExecuterSpan.GetTraceId(), "WaitForSnapshot");
22422238

22432239
return;
2240+
} else if (TxManager && GetSnapshot().IsValid()) {
2241+
TxManager->SetSnapshot(GetSnapshot().Step, GetSnapshot().TxId);
22442242
}
22452243

22462244
ContinueExecute();
@@ -2276,6 +2274,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22762274

22772275
SetSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
22782276
ImmediateTx = true;
2277+
if (TxManager) {
2278+
TxManager->SetSnapshot(GetSnapshot().Step, GetSnapshot().TxId);
2279+
}
22792280

22802281
ContinueExecute();
22812282
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
952952
if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) {
953953
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
954954
}
955+
if (GetSnapshot().IsValid() && settings.GetIsOlap()) {
956+
settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
957+
settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);
958+
}
955959
output.SinkSettings.ConstructInPlace();
956960
output.SinkSettings->PackFrom(settings);
957961
} else {

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,14 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
170170
const bool inconsistentTx,
171171
const NMiniKQL::TTypeEnvironment& typeEnv,
172172
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
173+
const NKikimrDataEvents::TMvccSnapshot& mvccSnapshot,
173174
const IKqpTransactionManagerPtr& txManager,
174175
const TActorId sessionActorId,
175176
TIntrusivePtr<TKqpCounters> counters,
176177
NWilson::TTraceId traceId)
177178
: TypeEnv(typeEnv)
178179
, Alloc(alloc)
180+
, MvccSnapshot(mvccSnapshot)
179181
, TableId(tableId)
180182
, TablePath(tablePath)
181183
, LockTxId(lockTxId)
@@ -813,6 +815,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
813815
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
814816
} else if (!InconsistentTx) {
815817
evWrite->SetLockId(LockTxId, LockNodeId);
818+
*evWrite->Record.MutableMvccSnapshot() = MvccSnapshot;
816819
}
817820

818821
const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite);
@@ -954,6 +957,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
954957
const NMiniKQL::TTypeEnvironment& TypeEnv;
955958
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
956959

960+
NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
961+
957962
const TTableId TableId;
958963
const TString TablePath;
959964

@@ -1019,6 +1024,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
10191024
Settings.GetInconsistentTx(),
10201025
TypeEnv,
10211026
Alloc,
1027+
Settings.GetMvccSnapshot(),
10221028
nullptr,
10231029
TActorId{},
10241030
Counters,
@@ -1206,6 +1212,7 @@ struct TTransactionSettings {
12061212
ui64 LockTxId = 0;
12071213
ui64 LockNodeId = 0;
12081214
bool InconsistentTx = false;
1215+
NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
12091216
};
12101217

12111218
struct TWriteSettings {
@@ -1316,10 +1323,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
13161323
AFL_ENSURE(LockTxId == settings.TransactionSettings.LockTxId);
13171324
AFL_ENSURE(LockNodeId == settings.TransactionSettings.LockNodeId);
13181325
AFL_ENSURE(InconsistentTx == settings.TransactionSettings.InconsistentTx);
1326+
AFL_ENSURE(MvccSnapshot.GetStep() == settings.TransactionSettings.MvccSnapshot.GetStep());
1327+
AFL_ENSURE(MvccSnapshot.GetTxId() == settings.TransactionSettings.MvccSnapshot.GetTxId());
13191328
} else {
13201329
LockTxId = settings.TransactionSettings.LockTxId;
13211330
LockNodeId = settings.TransactionSettings.LockNodeId;
13221331
InconsistentTx = settings.TransactionSettings.InconsistentTx;
1332+
MvccSnapshot.SetStep(settings.TransactionSettings.MvccSnapshot.GetStep());
1333+
MvccSnapshot.SetTxId(settings.TransactionSettings.MvccSnapshot.GetTxId());
13231334
}
13241335

13251336
auto& writeInfo = WriteInfos[settings.TableId];
@@ -1333,6 +1344,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
13331344
InconsistentTx,
13341345
TypeEnv,
13351346
Alloc,
1347+
MvccSnapshot,
13361348
TxManager,
13371349
SessionActorId,
13381350
Counters,
@@ -2011,6 +2023,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20112023

20122024
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
20132025
NMiniKQL::TTypeEnvironment TypeEnv;
2026+
NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
20142027

20152028
struct TWriteInfo {
20162029
TKqpTableWriteActor* WriteTableActor = nullptr;
@@ -2137,6 +2150,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
21372150
.LockTxId = Settings.GetLockTxId(),
21382151
.LockNodeId = Settings.GetLockNodeId(),
21392152
.InconsistentTx = Settings.GetInconsistentTx(),
2153+
.MvccSnapshot = Settings.GetMvccSnapshot(),
21402154
},
21412155
.Priority = Settings.GetPriority(),
21422156
};

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ message TKqpTableSinkSettings {
719719
optional NActorsProto.TActorId BufferActorId = 10;
720720
optional int64 Priority = 11;
721721
optional bool IsOlap = 12;
722+
optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13;
722723
}
723724

724725
message TKqpStreamLookupSettings {

0 commit comments

Comments
 (0)