Skip to content

YQ-3655 added block splitting into DQ output channel #15330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
std::atomic<ui64> MinChannelBufferSize = 0;
std::atomic<ui64> ChannelChunkSizeLimit = 48_MB;
std::atomic<ui64> MinMemAllocSize = 8_MB;
std::atomic<ui64> MinMemFreeSize = 32_MB;

Expand All @@ -106,6 +107,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
ChannelChunkSizeLimit.store(config.GetChannelChunkSizeLimit());
MinMemAllocSize.store(config.GetMinMemAllocSize());
MinMemFreeSize.store(config.GetMinMemFreeSize());
}
Expand Down Expand Up @@ -142,6 +144,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {

memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
memoryLimits.ChunkSizeLimit = ChannelChunkSizeLimit.load();
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
("ch_size", estimation.ChannelBufferMemoryLimit)
("ch_count", estimation.ChannelBuffersCount)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message TTableServiceConfig {
}
optional uint32 ComputeActorsCount = 1 [default = 10000];
optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB
optional uint64 ChannelChunkSizeLimit = 30 [default = 50331648]; // 48 MB
reserved 3;
optional uint64 MkqlLightProgramMemoryLimit = 4 [default = 1048576]; // 1 MiB
optional uint64 MkqlHeavyProgramMemoryLimit = 5 [default = 31457280]; // 30 MB
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ struct TComputeMemoryLimits {
ui64 MinMemAllocSize = 30_MB;
ui64 MinMemFreeSize = 30_MB;
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
ui64 ChunkSizeLimit = 48_MB;
TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer

IMemoryQuotaManager::TPtr MemoryQuotaManager;
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

Channels->SetOutputChannelPeer(channelUpdate.GetId(), peer);
outputChannel->HasPeer = true;
if (outputChannel->Channel) {
outputChannel->Channel->UpdateSettings({.IsLocalChannel = peer.NodeId() == this->SelfId().NodeId()});
}

continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
TDqTaskRunnerMemoryLimits limits;
limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize;
limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize;
limits.ChunkSizeLimit = this->MemoryLimits.ChunkSizeLimit;

if (!limits.OutputChunkMaxSize) {
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
}
}

if (this->Task.GetEnableSpilling()) {
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
Expand Down
219 changes: 216 additions & 3 deletions ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
#include "dq_arrow_helpers.h"

#include <cstddef>
#include <yql/essentials/public/udf/udf_value.h>
#include <yql/essentials/minikql/defs.h>
#include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/defs.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/public/udf/arrow/defs.h>
#include <yql/essentials/public/udf/arrow/memory_pool.h>
#include <yql/essentials/public/udf/arrow/util.h>
#include <yql/essentials/public/udf/udf_value.h>

#include <ydb/library/formats/arrow/size_calcer.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>

Expand Down Expand Up @@ -942,6 +947,214 @@ std::shared_ptr<arrow::Array> DeserializeArray(const std::string& blob, std::sha
return (*batch)->column(0);
}

// Block splitter

namespace {

class TBlockSplitter : public IBlockSplitter {
class TItem {
public:
TItem(TBlockSplitter& self, const NUdf::TUnboxedValuePod* values)
: Self(self)
{
Data.reserve(Self.Width);
ArraysIdx.reserve(Self.Width);
for (ui64 i = 0; i < Self.Width; ++i) {
auto datum = TBlockSplitter::ExtractDatum(values[i]);
if (datum.is_scalar()) {
ScalarsSize += Self.GetDatumMemorySize(i, datum);
} else {
ArraysIdx.emplace_back(i);
}
Data.emplace_back(std::move(datum));
}

NumberRows = Data.back().scalar_as<arrow::UInt64Scalar>().value;
UpdateArraysSize();
}

TItem(TBlockSplitter& self, std::vector<arrow::Datum>&& data, const std::vector<ui64>& arraysIdx, ui64 numberRows, ui64 scalarsSize)
: Self(self)
, Data(std::move(data))
, ArraysIdx(arraysIdx)
, NumberRows(numberRows)
, ScalarsSize(scalarsSize)
{
UpdateArraysSize();
}

ui64 GetNumberRows() const {
return NumberRows;
}

ui64 GetSize() const {
return ScalarsSize + ArraysSize;
}

std::vector<arrow::Datum> ExtractData() {
std::vector<arrow::Datum> result(std::move(Data));
for (ui64 i : ArraysIdx) {
result[i] = Self.GetColumnTrimmer(i).Trim(result[i].array());
}
result.back() = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(NumberRows));
return result;
}

TItem PopBack(ui64 length) {
MKQL_ENSURE(length <= NumberRows, "Can not pop more than number of rows");
std::vector<arrow::Datum> backData = Data;
for (ui64 i : ArraysIdx) {
auto array = Data[i].array();
Data[i] = NUdf::Chop(array, NumberRows - length);
backData[i] = array;
}

NumberRows -= length;
UpdateArraysSize();

return TItem(Self, std::move(backData), ArraysIdx, length, ScalarsSize);
}

private:
void UpdateArraysSize() {
ArraysSize = 0;
for (ui64 i : ArraysIdx) {
ArraysSize += NKikimr::NArrow::GetArrayDataSize(Data[i].make_array());
}
}

private:
TBlockSplitter& Self;
std::vector<arrow::Datum> Data;
std::vector<ui64> ArraysIdx;
ui64 NumberRows = 0;
ui64 ScalarsSize = 0;
ui64 ArraysSize = 0;
};

public:
TBlockSplitter(const TVector<const TBlockType*>& items, ui64 chunkSizeLimit, arrow::MemoryPool* pool)
: Items(items)
, Width(items.size())
, ChunkSizeLimit(chunkSizeLimit)
, ArrowPool(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
, ScalarSizes(Width)
, BlockTrimmers(Width)
{}

bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override {
MKQL_ENSURE(count == Width, "Invalid width");

ui64 itemSize = 0;
for (size_t i = 0; i < Width; ++i) {
itemSize += GetDatumMemorySize(i, ExtractDatum(values[i]));
}
return itemSize > ChunkSizeLimit;
}

std::vector<std::vector<arrow::Datum>> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override {
MKQL_ENSURE(count == Width, "Invalid width");

SplitStack.clear();
SplitStack.emplace_back(*this, values);
std::vector<std::vector<arrow::Datum>> result;

const auto estimatedSize = SplitStack.back().GetSize() / std::max(ChunkSizeLimit, ui64(1));
result.reserve(estimatedSize);
SplitStack.reserve(estimatedSize);
while (!SplitStack.empty()) {
auto item = std::move(SplitStack.back());
SplitStack.pop_back();

while (item.GetSize() > ChunkSizeLimit) {
if (item.GetNumberRows() <= 1) {
throw yexception() << "Row size in block is " << item.GetSize() << ", that is larger than allowed limit " << ChunkSizeLimit;
}
SplitStack.emplace_back(item.PopBack(item.GetNumberRows() / 2));
}
result.emplace_back(item.ExtractData());
}
return result;
}

private:
static arrow::Datum ExtractDatum(const NUdf::TUnboxedValuePod& value) {
arrow::Datum datum = TArrowBlock::From(value).GetDatum();
MKQL_ENSURE(datum.is_array() || datum.is_scalar(), "Expecting array or scalar");
return datum;
}

ui64 GetDatumMemorySize(ui64 index, const arrow::Datum& datum) {
MKQL_ENSURE(index < Width, "Invalid index");
if (datum.is_array()) {
return NKikimr::NArrow::GetArrayMemorySize(datum.array());
}

if (!ScalarSizes[index]) {
const auto& array = ARROW_RESULT(arrow::MakeArrayFromScalar(*datum.scalar(), 1));
ScalarSizes[index] = NKikimr::NArrow::GetArrayMemorySize(array->data());
}
return *ScalarSizes[index];
}

IBlockTrimmer& GetColumnTrimmer(ui64 index) {
MKQL_ENSURE(index < Width, "Invalid index");
if (!BlockTrimmers[index]) {
BlockTrimmers[index] = MakeBlockTrimmer(TTypeInfoHelper(), Items[index]->GetItemType(), &ArrowPool);
}
return *BlockTrimmers[index];
}

private:
const TVector<const TBlockType*> Items;
const ui64 Width;
const ui64 ChunkSizeLimit;
arrow::MemoryPool& ArrowPool;

std::vector<std::optional<ui64>> ScalarSizes;
std::vector<IBlockTrimmer::TPtr> BlockTrimmers;
std::vector<TItem> SplitStack;
};

} // namespace

IBlockSplitter::TPtr CreateBlockSplitter(const TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool) {
if (!type->IsMulti()) {
return nullptr;
}

const TMultiType* multiType = static_cast<const TMultiType*>(type);
const ui32 width = multiType->GetElementsCount();
if (!width) {
return nullptr;
}

TVector<const TBlockType*> items;
items.reserve(width);
for (ui32 i = 0; i < width; i++) {
const auto type = multiType->GetElementType(i);
if (!type->IsBlock()) {
return nullptr;
}

const TBlockType* blockType = static_cast<const TBlockType*>(type);
if (i == width - 1) {
if (blockType->GetShape() != TBlockType::EShape::Scalar) {
return nullptr;
}
if (!blockType->GetItemType()->IsData()) {
return nullptr;
}
if (static_cast<const TDataType*>(blockType->GetItemType())->GetDataSlot() != NUdf::EDataSlot::Uint64) {
return nullptr;
}
}

items.push_back(blockType);
}

return MakeIntrusive<TBlockSplitter>(items, chunkSizeLimit, pool);
}

} // namespace NArrow
} // namespace NYql

10 changes: 10 additions & 0 deletions ydb/library/yql/dq/runtime/dq_arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ std::shared_ptr<arrow::Array> DeserializeArray(const std::string& blob, std::sha
*/
void AppendElement(NYql::NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NKikimr::NMiniKQL::TType* type);

class IBlockSplitter : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IBlockSplitter>;

virtual bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0;

virtual std::vector<std::vector<arrow::Datum>> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0;
};

IBlockSplitter::TPtr CreateBlockSplitter(const NKikimr::NMiniKQL::TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool = nullptr);

} // NArrow
} // NYql
Loading
Loading