@@ -127,16 +127,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
127127 const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
128128 NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
129129 const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
130- const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131- const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx )
130+ ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131+ const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
132132 : TBase(std::move(request), database, userToken, counters, tableServiceConfig,
133133 userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, " DataExecuter" , streamResult)
134134 , AsyncIoFactory(std::move(asyncIoFactory))
135- , UseEvWrite(useEvWrite)
135+ , UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
136+ , HtapTx(tableServiceConfig.GetEnableHtapTx())
136137 , FederatedQuerySetup(federatedQuerySetup)
137138 , GUCSettings(GUCSettings)
138139 , ShardIdToTableInfo(shardIdToTableInfo)
139- , HtapTx(htapTx)
140140 {
141141 Target = creator;
142142
@@ -1487,7 +1487,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
14871487 auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
14881488
14891489 auto getShardTask = [&](ui64 shardId) -> TTask& {
1490- YQL_ENSURE (!UseEvWrite );
1490+ YQL_ENSURE (!UseEvWriteForOltp );
14911491 auto it = shardTasks.find (shardId);
14921492 if (it != shardTasks.end ()) {
14931493 return TasksGraph.GetTask (it->second );
@@ -1627,7 +1627,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16271627
16281628 void ExecuteDatashardTransaction (ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
16291629 {
1630- YQL_ENSURE (!UseEvWrite );
1630+ YQL_ENSURE (!UseEvWriteForOltp );
16311631 TShardState shardState;
16321632 shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
16331633 shardState.DatashardState .ConstructInPlace ();
@@ -2030,7 +2030,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20302030 TDatashardTxs datashardTxs;
20312031 TEvWriteTxs evWriteTxs;
20322032 BuildDatashardTxs (datashardTasks, datashardTxs, evWriteTxs, topicTxs);
2033- YQL_ENSURE (evWriteTxs.empty () || datashardTxs.empty ());
20342033
20352034 // Single-shard datashard transactions are always immediate
20362035 ImmediateTx = (datashardTxs.size () + evWriteTxs.size () + Request.TopicOperations .GetSize () + sourceScanPartitionsCount) <= 1
@@ -2261,7 +2260,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22612260 YQL_ENSURE (!locksList.empty (), " unexpected empty locks list in DataShardLocks" );
22622261 NKikimrDataEvents::TKqpLocks* locks = nullptr ;
22632262
2264- if (UseEvWrite ) {
2263+ if (UseEvWriteForOltp || ShardIdToTableInfo-> Get (shardId). IsOlap ) {
22652264 if (auto it = evWriteTxs.find (shardId); it != evWriteTxs.end ()) {
22662265 locks = it->second ->MutableLocks ();
22672266 } else {
@@ -2333,15 +2332,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23332332 // Note: currently persistent channels are never used
23342333 !HasPersistentChannels &&
23352334 // Can't use volatile transactions for EvWrite at current time
2336- !UseEvWrite );
2335+ evWriteTxs. empty () );
23372336
23382337 const bool useGenericReadSets = (
23392338 // Use generic readsets when feature is explicitly enabled
23402339 AppData ()->FeatureFlags .GetEnableDataShardGenericReadSets () ||
23412340 // Volatile transactions must always use generic readsets
23422341 VolatileTx ||
23432342 // Transactions with topics must always use generic readsets
2344- !topicTxs.empty ());
2343+ !topicTxs.empty () ||
2344+ // HTAP transactions always use generic readsets
2345+ !evWriteTxs.empty ());
23452346
23462347 if (!locksMap.empty () || VolatileTx ||
23472348 Request.TopicOperations .HasReadOperations () || Request.TopicOperations .HasWriteOperations ())
@@ -2463,12 +2464,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24632464 std::sort (receivingShards.begin (), receivingShards.end ());
24642465
24652466 for (auto & [shardId, shardTx] : datashardTxs) {
2466- AFL_ENSURE (!columnShardArbiter);
24672467 shardTx->MutableLocks ()->SetOp (NKikimrDataEvents::TKqpLocks::Commit);
2468- *shardTx->MutableLocks ()->MutableSendingShards () = sendingShards;
2469- *shardTx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2470- if (arbiter) {
2471- shardTx->MutableLocks ()->SetArbiterShard (arbiter);
2468+ if (columnShardArbiter) {
2469+ shardTx->MutableLocks ()->AddSendingShards (*columnShardArbiter);
2470+ shardTx->MutableLocks ()->AddReceivingShards (*columnShardArbiter);
2471+ if (sendingShardsSet.contains (shardId)) {
2472+ shardTx->MutableLocks ()->AddSendingShards (shardId);
2473+ }
2474+ if (receivingShardsSet.contains (shardId)) {
2475+ shardTx->MutableLocks ()->AddReceivingShards (shardId);
2476+ }
2477+ AFL_ENSURE (!arbiter);
2478+ } else {
2479+ *shardTx->MutableLocks ()->MutableSendingShards () = sendingShards;
2480+ *shardTx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2481+ if (arbiter) {
2482+ shardTx->MutableLocks ()->SetArbiterShard (arbiter);
2483+ }
24722484 }
24732485 }
24742486
@@ -2844,11 +2856,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28442856
28452857private:
28462858 NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2847- bool UseEvWrite = false ;
2859+ const bool UseEvWriteForOltp = false ;
2860+ const bool HtapTx = false ;
28482861 const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
28492862 const TGUCSettings::TPtr GUCSettings;
28502863 TShardIdToTableInfoPtr ShardIdToTableInfo;
2851- const bool HtapTx = false ;
28522864
28532865 bool HasExternalSources = false ;
28542866 bool SecretSnapshotRequired = false ;
@@ -2890,13 +2902,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28902902IActor* CreateKqpDataExecuter (IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
28912903 TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
28922904 NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
2893- const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
2905+ const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
28942906 const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
2895- const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx )
2907+ const TShardIdToTableInfoPtr& shardIdToTableInfo)
28962908{
28972909 return new TKqpDataExecuter (std::move (request), database, userToken, counters, streamResult, tableServiceConfig,
28982910 std::move (asyncIoFactory), creator, userRequestContext,
2899- useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx );
2911+ statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
29002912}
29012913
29022914} // namespace NKqp
0 commit comments