@@ -124,21 +124,19 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
124124 TKqpDataExecuter (IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
125125 const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
126126 TKqpRequestCounters::TPtr counters, bool streamResult,
127- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig ,
127+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig ,
128128 NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
129- const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
130- const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
131129 const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
132- const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
133- const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx )
134- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation ,
130+ ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131+ const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
132+ : TBase(std::move(request), database, userToken, counters, tableServiceConfig ,
135133 userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, " DataExecuter" , streamResult)
136134 , AsyncIoFactory(std::move(asyncIoFactory))
137- , UseEvWrite(useEvWrite)
135+ , UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
136+ , HtapTx(tableServiceConfig.GetEnableHtapTx())
138137 , FederatedQuerySetup(federatedQuerySetup)
139138 , GUCSettings(GUCSettings)
140139 , ShardIdToTableInfo(shardIdToTableInfo)
141- , HtapTx(htapTx)
142140 {
143141 Target = creator;
144142
@@ -1487,7 +1485,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
14871485 auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
14881486
14891487 auto getShardTask = [&](ui64 shardId) -> TTask& {
1490- YQL_ENSURE (!UseEvWrite );
1488+ YQL_ENSURE (!UseEvWriteForOltp );
14911489 auto it = shardTasks.find (shardId);
14921490 if (it != shardTasks.end ()) {
14931491 return TasksGraph.GetTask (it->second );
@@ -1627,7 +1625,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16271625
16281626 void ExecuteDatashardTransaction (ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
16291627 {
1630- YQL_ENSURE (!UseEvWrite );
1628+ YQL_ENSURE (!UseEvWriteForOltp );
16311629 TShardState shardState;
16321630 shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
16331631 shardState.DatashardState .ConstructInPlace ();
@@ -2025,7 +2023,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20252023 TDatashardTxs datashardTxs;
20262024 TEvWriteTxs evWriteTxs;
20272025 BuildDatashardTxs (datashardTasks, datashardTxs, evWriteTxs, topicTxs);
2028- YQL_ENSURE (evWriteTxs.empty () || datashardTxs.empty ());
20292026
20302027 // Single-shard datashard transactions are always immediate
20312028 ImmediateTx = (datashardTxs.size () + evWriteTxs.size () + Request.TopicOperations .GetSize () + sourceScanPartitionsCount) <= 1
@@ -2256,7 +2253,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22562253 YQL_ENSURE (!locksList.empty (), " unexpected empty locks list in DataShardLocks" );
22572254 NKikimrDataEvents::TKqpLocks* locks = nullptr ;
22582255
2259- if (UseEvWrite ) {
2256+ if (UseEvWriteForOltp || ShardIdToTableInfo-> Get (shardId). IsOlap ) {
22602257 if (auto it = evWriteTxs.find (shardId); it != evWriteTxs.end ()) {
22612258 locks = it->second ->MutableLocks ();
22622259 } else {
@@ -2328,15 +2325,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23282325 // Note: currently persistent channels are never used
23292326 !HasPersistentChannels &&
23302327 // Can't use volatile transactions for EvWrite at current time
2331- !UseEvWrite );
2328+ evWriteTxs. empty () );
23322329
23332330 const bool useGenericReadSets = (
23342331 // Use generic readsets when feature is explicitly enabled
23352332 AppData ()->FeatureFlags .GetEnableDataShardGenericReadSets () ||
23362333 // Volatile transactions must always use generic readsets
23372334 VolatileTx ||
23382335 // Transactions with topics must always use generic readsets
2339- !topicTxs.empty ());
2336+ !topicTxs.empty () ||
2337+ // HTAP transactions always use generic readsets
2338+ !evWriteTxs.empty ());
23402339
23412340 if (!locksMap.empty () || VolatileTx ||
23422341 Request.TopicOperations .HasReadOperations () || Request.TopicOperations .HasWriteOperations ())
@@ -2479,10 +2478,22 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24792478
24802479 for (auto & [shardId, shardTx] : datashardTxs) {
24812480 shardTx->MutableLocks ()->SetOp (NKikimrDataEvents::TKqpLocks::Commit);
2482- *shardTx->MutableLocks ()->MutableSendingShards () = sendingShards;
2483- *shardTx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2484- if (arbiter) {
2485- shardTx->MutableLocks ()->SetArbiterShard (arbiter);
2481+ if (columnShardArbiter) {
2482+ shardTx->MutableLocks ()->AddSendingShards (*columnShardArbiter);
2483+ shardTx->MutableLocks ()->AddReceivingShards (*columnShardArbiter);
2484+ if (sendingShardsSet.contains (shardId)) {
2485+ shardTx->MutableLocks ()->AddSendingShards (shardId);
2486+ }
2487+ if (receivingShardsSet.contains (shardId)) {
2488+ shardTx->MutableLocks ()->AddReceivingShards (shardId);
2489+ }
2490+ AFL_ENSURE (!arbiter);
2491+ } else {
2492+ *shardTx->MutableLocks ()->MutableSendingShards () = sendingShards;
2493+ *shardTx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2494+ if (arbiter) {
2495+ shardTx->MutableLocks ()->SetArbiterShard (arbiter);
2496+ }
24862497 }
24872498 }
24882499
@@ -2836,11 +2847,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28362847
28372848private:
28382849 NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2839- bool UseEvWrite = false ;
2850+ const bool UseEvWriteForOltp = false ;
2851+ const bool HtapTx = false ;
28402852 const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
28412853 const TGUCSettings::TPtr GUCSettings;
28422854 TShardIdToTableInfoPtr ShardIdToTableInfo;
2843- const bool HtapTx = false ;
28442855
28452856 bool HasExternalSources = false ;
28462857 bool SecretSnapshotRequired = false ;
@@ -2879,18 +2890,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28792890
28802891} // namespace
28812892
2882- IActor* CreateKqpDataExecuter (IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
2883- TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
2884- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
2885- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
2886- const TIntrusivePtr<TUserRequestContext>& userRequestContext,
2887- const bool useEvWrite, ui32 statementResultIndex,
2893+ IActor* CreateKqpDataExecuter (IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
2894+ const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
2895+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
2896+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
2897+ const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
28882898 const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
2889- const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx )
2899+ const TShardIdToTableInfoPtr& shardIdToTableInfo)
28902900{
2891- return new TKqpDataExecuter (std::move (request), database, userToken, counters, streamResult, executerRetriesConfig,
2892- std::move (asyncIoFactory), chanTransportVersion, aggregation , creator, userRequestContext,
2893- useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx );
2901+ return new TKqpDataExecuter (std::move (request), database, userToken, counters, streamResult,
2902+ tableServiceConfig, std::move (asyncIoFactory), creator, userRequestContext,
2903+ statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
28942904}
28952905
28962906} // namespace NKqp
0 commit comments