Skip to content

Fix sink flags #19985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool HasTableWrite = false;
bool HasTableRead = false;

std::optional<bool> EnableOltpSink;
std::optional<bool> EnableOlapSink;
std::optional<bool> EnableHtapTx;

bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;

Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
{
Target = creator;

YQL_ENSURE(!TxManager || tableServiceConfig.GetEnableOltpSink());
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);

if (Request.AcquireLocksTxId || Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
YQL_ENSURE(querySettings.Type);
queryProto.SetType(GetPhyQueryType(*querySettings.Type));

queryProto.SetEnableOltpSink(Config->EnableOltpSink);
queryProto.SetEnableOlapSink(Config->EnableOlapSink);
queryProto.SetEnableHtapTx(Config->EnableHtapTx);

for (const auto& queryBlock : dataQueryBlocks) {
auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock);
if (queryBlockSettings.HasUncommittedChangesRead) {
Expand Down
41 changes: 34 additions & 7 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,33 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
if (!QueryState->TxCtx->EnableOltpSink) {
QueryState->TxCtx->EnableOltpSink = phyQuery.GetEnableOltpSink();
}
if (QueryState->TxCtx->EnableOltpSink != phyQuery.GetEnableOltpSink()) {
ReplyQueryError(Ydb::StatusIds::ABORTED,
"Transaction execution settings have been changed (EnableOltpSink).");
return false;
}

if (!QueryState->TxCtx->EnableOlapSink) {
QueryState->TxCtx->EnableOlapSink = phyQuery.GetEnableOlapSink();
}
if (QueryState->TxCtx->EnableOlapSink != phyQuery.GetEnableOlapSink()) {
ReplyQueryError(Ydb::StatusIds::ABORTED,
"Transaction execution settings have been changed (EnableOlapSink).");
return false;
}

if (!QueryState->TxCtx->EnableHtapTx) {
QueryState->TxCtx->EnableHtapTx = phyQuery.GetEnableHtapTx();
}
if (QueryState->TxCtx->EnableHtapTx != phyQuery.GetEnableHtapTx()) {
ReplyQueryError(Ydb::StatusIds::ABORTED,
"Transaction execution settings have been changed (EnableHtapTx).");
return false;
}

const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
Expand All @@ -937,7 +964,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite;
QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead;
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
&& !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) {
&& !*QueryState->TxCtx->EnableHtapTx && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Write transactions between column and row tables are disabled at current time.");
return false;
Expand Down Expand Up @@ -1178,7 +1205,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
return;
}

if (Settings.TableService.GetEnableOltpSink() && isBatchQuery) {
if (*QueryState->TxCtx->EnableOltpSink && isBatchQuery) {
if (!Settings.TableService.GetEnableBatchUpdates()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"BATCH operations are disabled by EnableBatchUpdates flag.");
Expand Down Expand Up @@ -1353,7 +1380,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
}

if (Settings.TableService.GetEnableOltpSink()) {
if (*txCtx.EnableOltpSink) {
if (txCtx.TxHasEffects() || hasLocks || txCtx.TopicOperations.HasOperations()) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
Expand Down Expand Up @@ -1390,7 +1417,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
}
request.TopicOperations = std::move(txCtx.TopicOperations);
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || *txCtx.EnableOlapSink)) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

if (!txCtx.CanDeferEffects()) {
Expand Down Expand Up @@ -1453,12 +1480,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.ResourceManager_ = ResourceManager_;
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());

if (Settings.TableService.GetEnableOltpSink() && !txCtx->TxManager) {
if (*txCtx->EnableOltpSink && !txCtx->TxManager) {
txCtx->TxManager = CreateKqpTransactionManager();
txCtx->TxManager->SetAllowVolatile(AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions());
}

if (Settings.TableService.GetEnableOltpSink()
if (*txCtx->EnableOltpSink
&& !txCtx->BufferActorId
&& (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) {
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
Expand Down Expand Up @@ -1493,7 +1520,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
};
auto* actor = CreateKqpBufferWriterActor(std::move(settings));
txCtx->BufferActorId = RegisterWithSameMailbox(actor);
} else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) {
} else if (*txCtx->EnableOltpSink && txCtx->BufferActorId) {
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
txCtx->TxManager->AddTopicsToShards();
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,8 @@ message TKqpPhyQuery {
string QueryDiagnostics = 10;

repeated TKqpTableInfo ViewInfos = 11;

bool EnableOltpSink = 12;
bool EnableOlapSink = 13;
bool EnableHtapTx = 14;
}
Loading