Skip to content

mutable executer config #19977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
Expand All @@ -106,16 +106,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const IKqpTransactionManagerPtr& txManager,
const TActorId bufferActorId,
TMaybe<TBatchOperationSettings> batchOperationSettings = Nothing())
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, database, userToken, counters, tableServiceConfig,
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, database, userToken, counters, executerConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter,
"DataExecuter", streamResult, bufferActorId, txManager, std::move(batchOperationSettings))
, ShardIdToTableInfo(shardIdToTableInfo)
, AllowOlapDataQuery(tableServiceConfig.GetAllowOlapDataQuery())
, WaitCAStatsTimeout(TDuration::MilliSeconds(tableServiceConfig.GetQueryLimits().GetWaitCAStatsTimeoutMs()))
, AllowOlapDataQuery(executerConfig.TableServiceConfig.GetAllowOlapDataQuery())
, WaitCAStatsTimeout(TDuration::MilliSeconds(executerConfig.TableServiceConfig.GetQueryLimits().GetWaitCAStatsTimeoutMs()))
{
Target = creator;

YQL_ENSURE(!TxManager || tableServiceConfig.GetEnableOltpSink());
YQL_ENSURE(!TxManager || executerConfig.TableServiceConfig.GetEnableOltpSink());
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);

if (Request.AcquireLocksTxId || Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback) {
Expand Down Expand Up @@ -3055,14 +3055,14 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
} // namespace

IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
TKqpRequestCounters::TPtr counters, bool streamResult, const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
TMaybe<TBatchOperationSettings> batchOperationSettings)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerConfig,
std::move(asyncIoFactory), creator, userRequestContext, statementResultIndex, federatedQuerySetup, GUCSettings,
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings));
}
Expand Down
24 changes: 23 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,31 @@ struct TEvKqpExecuter {

struct TKqpFederatedQuerySetup;

struct TExecuterMutableConfig : public TAtomicRefCount<TExecuterMutableConfig>{
std::atomic<bool> EnableRowsDuplicationCheck = false;
std::atomic<bool> EnableParallelPointReadConsolidation = false;
std::atomic<bool> VerboseMemoryLimitException = false;

void ApplyFromTableServiceConfig(const NKikimrConfig::TTableServiceConfig& tableServiceConfig) {
EnableRowsDuplicationCheck.store(tableServiceConfig.GetEnableRowsDuplicationCheck());
EnableParallelPointReadConsolidation.store(tableServiceConfig.GetEnableParallelPointReadConsolidation());
VerboseMemoryLimitException.store(tableServiceConfig.GetResourceManager().GetVerboseMemoryLimitException());
}
};

struct TExecuterConfig : TNonCopyable {
TIntrusivePtr<TExecuterMutableConfig> MutableConfig;
const NKikimrConfig::TTableServiceConfig& TableServiceConfig;

TExecuterConfig( TIntrusivePtr<TExecuterMutableConfig> mutableConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig)
: MutableConfig(mutableConfig)
, TableServiceConfig(tableServiceConfig)
{}
};

IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,

IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TExecuterConfig& executerConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
Expand All @@ -88,7 +88,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false, tableServiceConfig,
std::move(request), database, userToken, counters, false, executerConfig,
std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr,
Expand All @@ -112,7 +112,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false, tableServiceConfig,
std::move(request), database, userToken, counters, false, executerConfig,
std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr,
Expand All @@ -122,14 +122,14 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(
std::move(request), database, userToken, counters,
tableServiceConfig, std::move(asyncIoFactory), preparedQuery, userRequestContext,
executerConfig, std::move(asyncIoFactory), preparedQuery, userRequestContext,
statementResultIndex, federatedQuerySetup, nullptr
);

case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
tableServiceConfig, std::move(asyncIoFactory), creator,
executerConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, GUCSettings,
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings)
Expand Down
26 changes: 14 additions & 12 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ struct TShardRangesWithShardId {
const TShardKeyRanges* Ranges;
};



struct TStageScheduleInfo {
double StageCost = 0.0;
ui32 TaskCount = 0;
Expand Down Expand Up @@ -136,7 +138,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const TExecuterConfig& executerConfig,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex,
ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase",
Expand All @@ -154,32 +156,32 @@ class TKqpExecuterBase : public TActor<TDerived> {
, Counters(counters)
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
, Planner(nullptr)
, ExecuterRetriesConfig(tableServiceConfig.GetExecuterRetriesConfig())
, AggregationSettings(tableServiceConfig.GetAggregationConfig())
, ExecuterRetriesConfig(executerConfig.TableServiceConfig.GetExecuterRetriesConfig())
, AggregationSettings(executerConfig.TableServiceConfig.GetAggregationConfig())
, HasOlapTable(false)
, StreamResult(streamResult)
, StatementResultIndex(statementResultIndex)
, BlockTrackingMode(tableServiceConfig.GetBlockTrackingMode())
, VerboseMemoryLimitException(tableServiceConfig.GetResourceManager().GetVerboseMemoryLimitException())
, BlockTrackingMode(executerConfig.TableServiceConfig.GetBlockTrackingMode())
, VerboseMemoryLimitException(executerConfig.MutableConfig->VerboseMemoryLimitException.load())
, BatchOperationSettings(std::move(batchOperationSettings))
{
if (tableServiceConfig.HasArrayBufferMinFillPercentage()) {
ArrayBufferMinFillPercentage = tableServiceConfig.GetArrayBufferMinFillPercentage();
if (executerConfig.TableServiceConfig.HasArrayBufferMinFillPercentage()) {
ArrayBufferMinFillPercentage = executerConfig.TableServiceConfig.GetArrayBufferMinFillPercentage();
}

EnableReadsMerge = *MergeDatashardReadsControl() == 1;
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
TasksGraph.GetMeta().Database = Database;
TasksGraph.GetMeta().ChannelTransportVersion = tableServiceConfig.GetChannelTransportVersion();
TasksGraph.GetMeta().ChannelTransportVersion = executerConfig.TableServiceConfig.GetChannelTransportVersion();
TasksGraph.GetMeta().UserRequestContext = userRequestContext;
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc, ExecType);
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());

CheckDuplicateRows = tableServiceConfig.GetEnableRowsDuplicationCheck();
EnableParallelPointReadConsolidation = tableServiceConfig.GetEnableParallelPointReadConsolidation();
CheckDuplicateRows = executerConfig.MutableConfig->EnableRowsDuplicationCheck.load();
EnableParallelPointReadConsolidation = executerConfig.MutableConfig->EnableParallelPointReadConsolidation.load();

StartTime = TAppData::TimeProvider->Now();
if (Request.Timeout) {
Expand Down Expand Up @@ -2326,7 +2328,7 @@ class TKqpExecuterBase : public TActor<TDerived> {

IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
Expand All @@ -2335,7 +2337,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TPreparedQueryHolder::TConstPtr preparedQuery,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
, Database(std::move(settings.Database))
, UserToken(std::move(settings.UserToken))
, RequestCounters(std::move(settings.RequestCounters))
, TableServiceConfig(std::move(settings.TableServiceConfig))
, TableServiceConfig(std::move(settings.ExecuterConfig.TableServiceConfig))
, MutableExecuterConfig(std::move(settings.ExecuterConfig.MutableConfig))
, UserRequestContext(std::move(settings.UserRequestContext))
, StatementResultIndex(std::move(settings.StatementResultIndex))
, AsyncIoFactory(std::move(std::move(settings.AsyncIoFactory)))
Expand Down Expand Up @@ -472,8 +473,9 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
auto bufferActorId = RegisterWithSameMailbox(bufferActor);

auto batchSettings = TBatchOperationSettings(partInfo->LimitSize, BatchOperationSettings.MinBatchSize);
const auto executerConfig = TExecuterConfig(MutableExecuterConfig, TableServiceConfig);
auto executerActor = CreateKqpExecuter(std::move(request), Database, UserToken, RequestCounters,
TableServiceConfig, AsyncIoFactory, PreparedQuery, SelfId(), UserRequestContext, StatementResultIndex,
executerConfig, AsyncIoFactory, PreparedQuery, SelfId(), UserRequestContext, StatementResultIndex,
FederatedQuerySetup, GUCSettings, ShardIdToTableInfo, txManager, bufferActorId, std::move(batchSettings));
auto exId = RegisterWithSameMailbox(executerActor);

Expand Down Expand Up @@ -956,6 +958,8 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TKqpRequestCounters::TPtr RequestCounters;
NKikimrConfig::TTableServiceConfig TableServiceConfig;
TIntrusivePtr<TExecuterMutableConfig> MutableExecuterConfig;

TIntrusivePtr<TUserRequestContext> UserRequestContext;
ui32 StatementResultIndex;
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_partitioned_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/kqp/common/kqp_tx.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/protos/table_service_config.pb.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/aclib/aclib.h>
Expand All @@ -22,7 +23,7 @@ struct TKqpPartitionedExecuterSettings {
TString Database;
const TIntrusiveConstPtr<NACLib::TUserToken>& UserToken;
TKqpRequestCounters::TPtr RequestCounters;
const NKikimrConfig::TTableServiceConfig& TableServiceConfig;
const TExecuterConfig& ExecuterConfig;
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
TPreparedQueryHolder::TConstPtr PreparedQuery;
const TIntrusivePtr<TUserRequestContext>& UserRequestContext;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc

TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TPreparedQueryHolder::TConstPtr preparedQuery,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, database, userToken, counters, tableServiceConfig,
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, database, userToken, counters, executerConfig,
userRequestContext, statementResultIndex, TWilsonKqp::ScanExecuter, "ScanExecuter",
false
)
Expand Down Expand Up @@ -329,13 +329,13 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const TExecuterConfig& executerConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TPreparedQueryHolder::TConstPtr preparedQuery,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
{
return new TKqpScanExecuter(std::move(request), database, userToken, counters, tableServiceConfig, std::move(asyncIoFactory),
return new TKqpScanExecuter(std::move(request), database, userToken, counters, executerConfig, std::move(asyncIoFactory),
preparedQuery, userRequestContext, statementResultIndex,
federatedQuerySetup, GUCSettings);
}
Expand Down
Loading
Loading