Skip to content

Commit 3096657

Browse files
authored
Snapshot Isolation: kqp (#12825)
1 parent e432d9c commit 3096657

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+485
-49
lines changed

.github/config/muted_ya.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk
4949
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
5050
ydb/core/kqp/ut/query KqpLimits.QueryExecTimeoutCancel
5151
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
52+
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWrite*
53+
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictRead*
5254
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
5355
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
5456
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication

ydb/core/data_integrity_trails/data_integrity_trails.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream&
2727
case TransactionSettings::kSnapshotReadOnly:
2828
LogKeyValue("TxMode", "SnapshotReadOnly", ss);
2929
break;
30+
case TransactionSettings::kSnapshotReadWrite:
31+
LogKeyValue("TxMode", "SnapshotReadWrite", ss);
32+
break;
3033
case TransactionSettings::TX_MODE_NOT_SET:
3134
LogKeyValue("TxMode", "Undefined", ss);
3235
break;

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::Tra
6464
case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
6565
to.mutable_snapshot_read_only();
6666
break;
67+
case Ydb::Query::TransactionSettings::kSnapshotReadWrite:
68+
to.mutable_snapshot_read_write();
69+
break;
6770
default:
6871
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
6972
"Invalid tx_settings"));

ydb/core/grpc_services/query/rpc_kqp_tx.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ class TBeginTransactionRPC : public TActorBootstrapped<TBeginTransactionRPC> {
104104
case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
105105
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_only();
106106
break;
107+
case Ydb::Query::TransactionSettings::kSnapshotReadWrite:
108+
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_write();
109+
break;
107110
}
108111

109112
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
158158
Y_UNUSED(config);
159159

160160
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE &&
161-
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO)
161+
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO &&
162+
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW)
162163
return false;
163164

164165
if (txCtx.GetSnapshot().IsValid())
@@ -211,26 +212,42 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
211212

212213
YQL_ENSURE(!hasSinkWrite || hasEffects);
213214

214-
// We don't want snapshot when there are effects at the moment,
215-
// because it hurts performance when there are multiple single-shard
216-
// reads and a single distributed commit. Taking snapshot costs
217-
// similar to an additional distributed transaction, and it's very
218-
// hard to predict when that happens, causing performance
219-
// degradation.
220-
if (hasEffects) {
221-
return false;
222-
}
223-
224215
// We need snapshot for stream lookup, besause it's used for dependent reads
225216
if (hasStreamLookup) {
226217
return true;
227218
}
228219

220+
if (*txCtx.EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) {
221+
if (hasEffects && !txCtx.HasTableRead) {
222+
YQL_ENSURE(txCtx.HasTableWrite);
223+
// Don't need snapshot for WriteOnly transaction.
224+
return false;
225+
} else if (hasEffects) {
226+
YQL_ENSURE(txCtx.HasTableWrite);
227+
// ReadWrite transaction => need snapshot
228+
return true;
229+
}
230+
// ReadOnly transaction here
231+
} else {
232+
// We don't want snapshot when there are effects at the moment,
233+
// because it hurts performance when there are multiple single-shard
234+
// reads and a single distributed commit. Taking snapshot costs
235+
// similar to an additional distributed transaction, and it's very
236+
// hard to predict when that happens, causing performance
237+
// degradation.
238+
if (hasEffects) {
239+
return false;
240+
}
241+
}
242+
243+
YQL_ENSURE(!hasEffects && !hasStreamLookup);
244+
229245
// We need snapshot when there are multiple table read phases, most
230246
// likely it involves multiple tables and we would have to use a
231247
// distributed commit otherwise. Taking snapshot helps as avoid TLI
232248
// for read-only transactions, and costs less than a final distributed
233249
// commit.
250+
// NOTE: In case of read from single shard, we won't take snapshot.
234251
return readPhases > 1;
235252
}
236253

ydb/core/kqp/common/kqp_tx.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
235235
HasOlapTable = false;
236236
HasOltpTable = false;
237237
HasTableWrite = false;
238+
HasTableRead = false;
238239
NeedUncommittedChangesFlush = false;
239240
}
240241

@@ -264,16 +265,24 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
264265
Readonly = true;
265266
break;
266267

268+
case Ydb::Table::TransactionSettings::kSnapshotReadWrite:
269+
EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW;
270+
Readonly = false;
271+
break;
272+
267273
case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET:
268274
YQL_ENSURE(false, "tx_mode not set, settings: " << settings);
269275
break;
270276
};
271277
}
272278

273-
bool ShouldExecuteDeferredEffects() const {
279+
bool ShouldExecuteDeferredEffects(const TKqpPhyTxHolder::TConstPtr& tx) const {
274280
if (NeedUncommittedChangesFlush || HasOlapTable) {
275281
return !DeferredEffects.Empty();
276282
}
283+
if (EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW && !tx && HasTableRead) {
284+
return !DeferredEffects.Empty();
285+
}
277286

278287
return false;
279288
}
@@ -343,6 +352,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
343352
bool HasOlapTable = false;
344353
bool HasOltpTable = false;
345354
bool HasTableWrite = false;
355+
bool HasTableRead = false;
346356

347357
bool NeedUncommittedChangesFlush = false;
348358
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
652652
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
653653
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
654654
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
655+
kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();
655656

656657
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
657658
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
315315

316316
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
317317

318+
bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW();
319+
318320
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
319321
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
320322

@@ -346,7 +348,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
346348
TableServiceConfig.GetEnableAstCache() != enableAstCache ||
347349
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
348350
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
349-
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) {
351+
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
352+
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW) {
350353

351354
QueryCache->Clear();
352355

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,20 @@ namespace NKikimr::NKqp {
131131
using namespace NYql::NDq;
132132
using namespace NYql::NDqProto;
133133

134-
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
134+
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137137
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions,
138138
NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode)
139139
{
140-
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
140+
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory),
141141
settings, memoryLimits, std::move(traceId), std::move(arena), mode);
142142
}
143143

144144
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
145145
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
146-
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
147-
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
146+
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
147+
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, shardsScanningPolicy, counters, std::move(traceId));
148148
}
149149

150150
}

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
5252
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
5353
TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode);
5454

55-
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
56-
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
57-
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
55+
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
56+
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings,
57+
const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
5858
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode);
5959

6060
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
6161
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
62-
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
62+
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode,
63+
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
6364

6465
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
6566
TIntrusivePtr<TKqpCounters> counters,

0 commit comments

Comments
 (0)