Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead) {
if (HasUncommittedChangesRead || HasOlapTable) {
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -297,7 +297,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
return false;
}

Expand Down
52 changes: 32 additions & 20 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, UseEvWrite(useEvWrite)
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
, HtapTx(tableServiceConfig.GetEnableHtapTx())
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
, HtapTx(htapTx)
{
Target = creator;

Expand Down Expand Up @@ -1487,7 +1487,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

auto getShardTask = [&](ui64 shardId) -> TTask& {
YQL_ENSURE(!UseEvWrite);
YQL_ENSURE(!UseEvWriteForOltp);
auto it = shardTasks.find(shardId);
if (it != shardTasks.end()) {
return TasksGraph.GetTask(it->second);
Expand Down Expand Up @@ -1627,7 +1627,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
{
YQL_ENSURE(!UseEvWrite);
YQL_ENSURE(!UseEvWriteForOltp);
TShardState shardState;
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
shardState.DatashardState.ConstructInPlace();
Expand Down Expand Up @@ -2030,7 +2030,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TDatashardTxs datashardTxs;
TEvWriteTxs evWriteTxs;
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());

// Single-shard datashard transactions are always immediate
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
Expand Down Expand Up @@ -2261,7 +2260,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");
NKikimrDataEvents::TKqpLocks* locks = nullptr;

if (UseEvWrite) {
if (UseEvWriteForOltp || ShardIdToTableInfo->Get(shardId).IsOlap) {
if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) {
locks = it->second->MutableLocks();
} else {
Expand Down Expand Up @@ -2333,15 +2332,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Note: currently persistent channels are never used
!HasPersistentChannels &&
// Can't use volatile transactions for EvWrite at current time
!UseEvWrite);
evWriteTxs.empty());

const bool useGenericReadSets = (
// Use generic readsets when feature is explicitly enabled
AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() ||
// Volatile transactions must always use generic readsets
VolatileTx ||
// Transactions with topics must always use generic readsets
!topicTxs.empty());
!topicTxs.empty() ||
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
Expand Down Expand Up @@ -2463,12 +2464,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
std::sort(receivingShards.begin(), receivingShards.end());

for (auto& [shardId, shardTx] : datashardTxs) {
AFL_ENSURE(!columnShardArbiter);
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
if (columnShardArbiter) {
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
if (sendingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddSendingShards(shardId);
}
if (receivingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddReceivingShards(shardId);
}
AFL_ENSURE(!arbiter);
} else {
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}
}

Expand Down Expand Up @@ -2844,11 +2856,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool UseEvWrite = false;
const bool UseEvWriteForOltp = false;
const bool HtapTx = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;
const bool HtapTx = false;

bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
Expand Down Expand Up @@ -2890,13 +2902,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
std::move(asyncIoFactory), creator, userRequestContext,
useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx);
statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
}

} // namespace NKqp
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
Expand Down
17 changes: 8 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false, tableServiceConfig,
std::move(asyncIoFactory), creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
);
}

Expand All @@ -113,8 +112,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false, tableServiceConfig,
std::move(asyncIoFactory), creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
Expand All @@ -128,8 +127,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx
userRequestContext, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo
);

default:
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2031,10 +2031,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ class TKqpQueryState : public TNonCopyable {
return false;
}

if (TxCtx->HasOlapTable) {
// HTAP/OLAP transactions always use separate commit.
return false;
}

if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (tx && tx->GetHasEffects()) {
YQL_ENSURE(tx->ResultsSize() == 0);
Expand Down
21 changes: 2 additions & 19 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) {
request.UseImmediateEffects = true;
}
}
Expand Down Expand Up @@ -1292,29 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.ResourceManager_ = ResourceManager_;
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());

bool useEvWrite = (
(txCtx->HasOlapTable // olap only
&& !txCtx->HasOltpTable
&& Settings.TableService.GetEnableOlapSink())
|| (txCtx->HasOltpTable // oltp only
&& !txCtx->HasOlapTable
&& Settings.TableService.GetEnableOltpSink())
|| (txCtx->HasOlapTable // htap
&& txCtx->HasOltpTable
&& Settings.TableService.GetEnableOlapSink()
&& Settings.TableService.GetEnableHtapTx()))
&& (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML)
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML));
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService,
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo,
Settings.TableService.GetEnableHtapTx());
QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo);

auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
Expand Down
Loading