Skip to content

Commit 879bae2

Browse files
committed
fix
1 parent 2722753 commit 879bae2

File tree

6 files changed

+24
-35
lines changed

6 files changed

+24
-35
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
127127
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
128128
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
129129
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
130-
const bool useEvWriteForOltp, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131-
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
130+
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131+
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
132132
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
133133
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
134134
, AsyncIoFactory(std::move(asyncIoFactory))
135-
, UseEvWriteForOltp(useEvWriteForOltp)
135+
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
136+
, HtapTx(tableServiceConfig.GetEnableHtapTx())
136137
, FederatedQuerySetup(federatedQuerySetup)
137138
, GUCSettings(GUCSettings)
138139
, ShardIdToTableInfo(shardIdToTableInfo)
139-
, HtapTx(htapTx)
140140
{
141141
Target = creator;
142142

@@ -2850,11 +2850,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28502850

28512851
private:
28522852
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2853-
bool UseEvWriteForOltp = false;
2853+
const bool UseEvWriteForOltp = false;
2854+
const bool HtapTx = false;
28542855
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
28552856
const TGUCSettings::TPtr GUCSettings;
28562857
TShardIdToTableInfoPtr ShardIdToTableInfo;
2857-
const bool HtapTx = false;
28582858

28592859
bool HasExternalSources = false;
28602860
bool SecretSnapshotRequired = false;
@@ -2896,13 +2896,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28962896
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
28972897
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
28982898
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
2899-
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWriteForOltp, ui32 statementResultIndex,
2899+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
29002900
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
2901-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
2901+
const TShardIdToTableInfoPtr& shardIdToTableInfo)
29022902
{
29032903
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
29042904
std::move(asyncIoFactory), creator, userRequestContext,
2905-
useEvWriteForOltp, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx);
2905+
statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
29062906
}
29072907

29082908
} // namespace NKqp

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
9595
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
9696
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
9797
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
98-
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
99-
const bool useEvWriteForOltp, ui32 statementResultIndex,
98+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
10099
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
101-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
100+
const TShardIdToTableInfoPtr& shardIdToTableInfo);
102101

103102
IActor* CreateKqpSchemeExecuter(
104103
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,

ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
8080
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
8181
const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
8282
TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
83-
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
84-
const bool useEvWriteForOltp, ui32 statementResultIndex,
83+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
8584
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
86-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
85+
const TShardIdToTableInfoPtr& shardIdToTableInfo)
8786
{
8887
if (request.Transactions.empty()) {
8988
// commit-only or rollback-only data transaction
9089
return CreateKqpDataExecuter(
9190
std::move(request), database, userToken, counters, false, tableServiceConfig,
9291
std::move(asyncIoFactory), creator,
93-
userRequestContext, useEvWriteForOltp, statementResultIndex,
94-
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
92+
userRequestContext, statementResultIndex,
93+
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
9594
);
9695
}
9796

@@ -113,8 +112,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
113112
return CreateKqpDataExecuter(
114113
std::move(request), database, userToken, counters, false, tableServiceConfig,
115114
std::move(asyncIoFactory), creator,
116-
userRequestContext, useEvWriteForOltp, statementResultIndex,
117-
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
115+
userRequestContext, statementResultIndex,
116+
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
118117
);
119118

120119
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
@@ -128,8 +127,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
128127
return CreateKqpDataExecuter(
129128
std::move(request), database, userToken, counters, true,
130129
tableServiceConfig, std::move(asyncIoFactory), creator,
131-
userRequestContext, useEvWriteForOltp, statementResultIndex,
132-
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx
130+
userRequestContext, statementResultIndex,
131+
federatedQuerySetup, GUCSettings, shardIdToTableInfo
133132
);
134133

135134
default:

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,10 +2031,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
20312031
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
20322032
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
20332033
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
2034-
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
2035-
const bool useEvWriteForOltp, ui32 statementResultIndex,
2034+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
20362035
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
2037-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
2036+
const TShardIdToTableInfoPtr& shardIdToTableInfo);
20382037

20392038
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
20402039
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,8 @@ class TKqpQueryState : public TNonCopyable {
413413
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
414414
auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx);
415415

416-
if (TxCtx->CanDeferEffects() && TxCtx->HasOlapTable) {
417-
// HTAP/OLAP transactions and sinks can't be deffered.
416+
if (TxCtx->CanDeferEffects()) {
417+
// At current time sinks require separate tnx with commit.
418418
while (tx && tx->GetHasEffects() && !HasTxSinkInTx(tx)) {
419419
QueryData->CreateKqpValueMap(tx);
420420
bool success = TxCtx->AddDeferredEffect(tx, QueryData);

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,20 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12921292
request.ResourceManager_ = ResourceManager_;
12931293
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
12941294

1295-
const bool querySupportsEvWrite = (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED
1296-
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY
1297-
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
1298-
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML)
1299-
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML));
1300-
const bool useEvWriteForOltp = querySupportsEvWrite && Settings.TableService.GetEnableOltpSink();
1301-
13021295
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
13031296
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
13041297
RequestCounters, Settings.TableService,
13051298
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
13061299
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
1307-
useEvWriteForOltp, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo,
1308-
Settings.TableService.GetEnableHtapTx());
1300+
QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo);
13091301

13101302
auto exId = RegisterWithSameMailbox(executerActor);
13111303
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);

0 commit comments

Comments
 (0)