Skip to content

Setup the buffer page size from config on start #19724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
memoryLimits.MinMemAllocSize = MinMemAllocSize.load();
memoryLimits.MinMemFreeSize = MinMemFreeSize.load();
memoryLimits.ArrayBufferMinFillPercentage = args.Task->GetArrayBufferMinFillPercentage();
if (args.Task->HasBufferPageAllocSize()) {
memoryLimits.BufferPageAllocSize = args.Task->GetBufferPageAllocSize();
}

auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
NRm::TKqpResourcesRequest resourcesRequest;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ class TKqpExecuterBase : public TActor<TDerived> {
ArrayBufferMinFillPercentage = executerConfig.TableServiceConfig.GetArrayBufferMinFillPercentage();
}

if (executerConfig.TableServiceConfig.HasBufferPageAllocSize()) {
BufferPageAllocSize = executerConfig.TableServiceConfig.GetBufferPageAllocSize();
}

EnableReadsMerge = *MergeDatashardReadsControl() == 1;
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
Expand Down Expand Up @@ -1702,6 +1706,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
.CaFactory_ = Request.CaFactory_,
.BlockTrackingMode = BlockTrackingMode,
.ArrayBufferMinFillPercentage = ArrayBufferMinFillPercentage,
.BufferPageAllocSize = BufferPageAllocSize,
.VerboseMemoryLimitException = VerboseMemoryLimitException,
});

Expand Down Expand Up @@ -2313,6 +2318,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const bool VerboseMemoryLimitException;
TMaybe<ui8> ArrayBufferMinFillPercentage;
TMaybe<ui8> BufferPageAllocSize;

ui64 StatCollectInflightBytes = 0;
ui64 StatFinishInflightBytes = 0;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, CaFactory_(args.CaFactory_)
, BlockTrackingMode(args.BlockTrackingMode)
, ArrayBufferMinFillPercentage(args.ArrayBufferMinFillPercentage)
, BufferPageAllocSize(args.BufferPageAllocSize)
, VerboseMemoryLimitException(args.VerboseMemoryLimitException)
{
Y_UNUSED(MkqlMemoryLimit);
Expand Down Expand Up @@ -488,6 +489,10 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
taskDesc->SetArrayBufferMinFillPercentage(*ArrayBufferMinFillPercentage);
}

if (BufferPageAllocSize) {
taskDesc->SetBufferPageAllocSize(*BufferPageAllocSize);
}

auto startResult = CaFactory_->CreateKqpComputeActor({
.ExecuterId = ExecuterId,
.TxId = TxId,
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TKqpPlanner {
const std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory>& CaFactory_;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const TMaybe<ui8> ArrayBufferMinFillPercentage;
const TMaybe<ui8> BufferPageAllocSize;
const bool VerboseMemoryLimitException;
};

Expand Down Expand Up @@ -140,6 +141,7 @@ class TKqpPlanner {
TVector<TProgressStat> LastStats;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const TMaybe<ui8> ArrayBufferMinFillPercentage;
const TMaybe<ui8> BufferPageAllocSize;
const bool VerboseMemoryLimitException;

public:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,6 @@ message TTableServiceConfig {
optional bool EnableIndexStreamWrite = 88 [default = false];

optional bool EnableParallelPointReadConsolidation = 89 [default = true];

optional uint64 BufferPageAllocSize = 90 [ default = 4096 ];
};
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ struct TComputeMemoryLimits {
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
ui64 ChunkSizeLimit = 48_MB;
TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer and DqOutputChannel
TMaybe<size_t> BufferPageAllocSize;

IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize;
limits.ChunkSizeLimit = this->MemoryLimits.ChunkSizeLimit;
limits.ArrayBufferMinFillPercentage = this->MemoryLimits.ArrayBufferMinFillPercentage;
limits.BufferPageAllocSize = this->MemoryLimits.BufferPageAllocSize;

if (!limits.OutputChunkMaxSize) {
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/proto/dq_tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,5 @@ message TDqTask {
optional bool EnableSpilling = 19;
optional uint32 ArrayBufferMinFillPercentage = 20;
optional bool DisableMetering = 22;
optional uint64 BufferPageAllocSize = 23;
}
4 changes: 1 addition & 3 deletions ydb/library/yql/dq/runtime/dq_output_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ namespace {

using namespace NKikimr;

using NKikimr::NMiniKQL::TPagedBuffer;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

template<bool FastPack>
Expand All @@ -36,7 +34,7 @@ class TDqOutputChannel : public IDqOutputChannel {
const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc,
NDqProto::EDataTransportVersion transportVersion)
: OutputType(outputType)
, Packer(OutputType)
, Packer(OutputType, settings.BufferPageAllocSize)
, Width(OutputType->IsMulti() ? static_cast<NMiniKQL::TMultiType*>(OutputType)->GetElementsCount() : 1u)
, Storage(settings.ChannelStorage)
, HolderFactory(holderFactory)
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/runtime/dq_output_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct TDqOutputChannelSettings {
IDqChannelStorage::TPtr ChannelStorage;
TCollectStatsLevel Level = TCollectStatsLevel::None;
TMaybe<ui8> ArrayBufferMinFillPercentage;
TMaybe<size_t> BufferPageAllocSize;
TMutable MutableSettings;
};

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ class TDqTaskRunner : public IDqTaskRunner {
settings.MaxChunkBytes = memoryLimits.OutputChunkMaxSize;
settings.ChunkSizeLimit = memoryLimits.ChunkSizeLimit;
settings.ArrayBufferMinFillPercentage = memoryLimits.ArrayBufferMinFillPercentage;
settings.BufferPageAllocSize = memoryLimits.BufferPageAllocSize;
settings.TransportVersion = outputChannelDesc.GetTransportVersion();
settings.Level = StatsModeToCollectStatsLevel(Settings.StatsMode);

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ struct TDqTaskRunnerMemoryLimits {
ui32 OutputChunkMaxSize = 0;
ui32 ChunkSizeLimit = 48_MB;
TMaybe<ui8> ArrayBufferMinFillPercentage;
TMaybe<size_t> BufferPageAllocSize;
};

NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type,
Expand Down
Loading