Skip to content

Commit ce1ba2a

Browse files
authored
Merge f30a0dd into ecb9d76
2 parents ecb9d76 + f30a0dd commit ce1ba2a

File tree

12 files changed

+25
-3
lines changed

12 files changed

+25
-3
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
124124
memoryLimits.MinMemAllocSize = MinMemAllocSize.load();
125125
memoryLimits.MinMemFreeSize = MinMemFreeSize.load();
126126
memoryLimits.ArrayBufferMinFillPercentage = args.Task->GetArrayBufferMinFillPercentage();
127+
if (args.Task->HasBufferPageAllocSize()) {
128+
memoryLimits.BufferPageAllocSize = args.Task->GetBufferPageAllocSize();
129+
}
127130

128131
auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
129132
NRm::TKqpResourcesRequest resourcesRequest;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ class TKqpExecuterBase : public TActor<TDerived> {
169169
ArrayBufferMinFillPercentage = executerConfig.TableServiceConfig.GetArrayBufferMinFillPercentage();
170170
}
171171

172+
if (executerConfig.TableServiceConfig.HasBufferPageAllocSize()) {
173+
BufferPageAllocSize = executerConfig.TableServiceConfig.GetBufferPageAllocSize();
174+
}
175+
172176
EnableReadsMerge = *MergeDatashardReadsControl() == 1;
173177
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
174178
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
@@ -1702,6 +1706,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17021706
.CaFactory_ = Request.CaFactory_,
17031707
.BlockTrackingMode = BlockTrackingMode,
17041708
.ArrayBufferMinFillPercentage = ArrayBufferMinFillPercentage,
1709+
.BufferPageAllocSize = BufferPageAllocSize,
17051710
.VerboseMemoryLimitException = VerboseMemoryLimitException,
17061711
});
17071712

@@ -2313,6 +2318,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
23132318
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
23142319
const bool VerboseMemoryLimitException;
23152320
TMaybe<ui8> ArrayBufferMinFillPercentage;
2321+
TMaybe<ui8> BufferPageAllocSize;
23162322

23172323
ui64 StatCollectInflightBytes = 0;
23182324
ui64 StatFinishInflightBytes = 0;

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
105105
, CaFactory_(args.CaFactory_)
106106
, BlockTrackingMode(args.BlockTrackingMode)
107107
, ArrayBufferMinFillPercentage(args.ArrayBufferMinFillPercentage)
108+
, BufferPageAllocSize(args.BufferPageAllocSize)
108109
, VerboseMemoryLimitException(args.VerboseMemoryLimitException)
109110
{
110111
Y_UNUSED(MkqlMemoryLimit);
@@ -488,6 +489,10 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
488489
taskDesc->SetArrayBufferMinFillPercentage(*ArrayBufferMinFillPercentage);
489490
}
490491

492+
if (BufferPageAllocSize) {
493+
taskDesc->SetBufferPageAllocSize(*BufferPageAllocSize);
494+
}
495+
491496
auto startResult = CaFactory_->CreateKqpComputeActor({
492497
.ExecuterId = ExecuterId,
493498
.TxId = TxId,

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class TKqpPlanner {
6565
const std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory>& CaFactory_;
6666
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
6767
const TMaybe<ui8> ArrayBufferMinFillPercentage;
68+
const TMaybe<ui8> BufferPageAllocSize;
6869
const bool VerboseMemoryLimitException;
6970
};
7071

@@ -140,6 +141,7 @@ class TKqpPlanner {
140141
TVector<TProgressStat> LastStats;
141142
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
142143
const TMaybe<ui8> ArrayBufferMinFillPercentage;
144+
const TMaybe<ui8> BufferPageAllocSize;
143145
const bool VerboseMemoryLimitException;
144146

145147
public:

ydb/core/protos/table_service_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,6 @@ message TTableServiceConfig {
394394
optional bool EnableIndexStreamWrite = 88 [default = false];
395395

396396
optional bool EnableParallelPointReadConsolidation = 89 [default = true];
397+
398+
optional uint64 BufferPageAllocSize = 90 [ default = 4096 ];
397399
};

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ struct TComputeMemoryLimits {
380380
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
381381
ui64 ChunkSizeLimit = 48_MB;
382382
TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer and DqOutputChannel
383+
TMaybe<size_t> BufferPageAllocSize;
383384

384385
IMemoryQuotaManager::TPtr MemoryQuotaManager;
385386
};

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
216216
limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize;
217217
limits.ChunkSizeLimit = this->MemoryLimits.ChunkSizeLimit;
218218
limits.ArrayBufferMinFillPercentage = this->MemoryLimits.ArrayBufferMinFillPercentage;
219+
limits.BufferPageAllocSize = this->MemoryLimits.BufferPageAllocSize;
219220

220221
if (!limits.OutputChunkMaxSize) {
221222
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;

ydb/library/yql/dq/proto/dq_tasks.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,5 @@ message TDqTask {
227227
optional bool EnableSpilling = 19;
228228
optional uint32 ArrayBufferMinFillPercentage = 20;
229229
optional bool DisableMetering = 22;
230+
optional uint64 BufferPageAllocSize = 23;
230231
}

ydb/library/yql/dq/runtime/dq_output_channel.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ namespace {
2222

2323
using namespace NKikimr;
2424

25-
using NKikimr::NMiniKQL::TPagedBuffer;
26-
2725
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2826

2927
template<bool FastPack>
@@ -36,7 +34,7 @@ class TDqOutputChannel : public IDqOutputChannel {
3634
const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc,
3735
NDqProto::EDataTransportVersion transportVersion)
3836
: OutputType(outputType)
39-
, Packer(OutputType)
37+
, Packer(OutputType, settings.BufferPageAllocSize)
4038
, Width(OutputType->IsMulti() ? static_cast<NMiniKQL::TMultiType*>(OutputType)->GetElementsCount() : 1u)
4139
, Storage(settings.ChannelStorage)
4240
, HolderFactory(holderFactory)

ydb/library/yql/dq/runtime/dq_output_channel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ struct TDqOutputChannelSettings {
3737
IDqChannelStorage::TPtr ChannelStorage;
3838
TCollectStatsLevel Level = TCollectStatsLevel::None;
3939
TMaybe<ui8> ArrayBufferMinFillPercentage;
40+
TMaybe<size_t> BufferPageAllocSize;
4041
TMutable MutableSettings;
4142
};
4243

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ class TDqTaskRunner : public IDqTaskRunner {
683683
settings.MaxChunkBytes = memoryLimits.OutputChunkMaxSize;
684684
settings.ChunkSizeLimit = memoryLimits.ChunkSizeLimit;
685685
settings.ArrayBufferMinFillPercentage = memoryLimits.ArrayBufferMinFillPercentage;
686+
settings.BufferPageAllocSize = memoryLimits.BufferPageAllocSize;
686687
settings.TransportVersion = outputChannelDesc.GetTransportVersion();
687688
settings.Level = StatsModeToCollectStatsLevel(Settings.StatsMode);
688689

ydb/library/yql/dq/runtime/dq_tasks_runner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ struct TDqTaskRunnerMemoryLimits {
220220
ui32 OutputChunkMaxSize = 0;
221221
ui32 ChunkSizeLimit = 48_MB;
222222
TMaybe<ui8> ArrayBufferMinFillPercentage;
223+
TMaybe<size_t> BufferPageAllocSize;
223224
};
224225

225226
NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type,

0 commit comments

Comments
 (0)