Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class TApplyWrapper: public TMutableCodegeneratorPtrNode<TApplyWrapper> {
, ValueBuilder(HolderFactory, NUdf::EValidatePolicy::Exception)
, Args(argsCount)
{
Alloc.Ref().EnableArrowTracking = false;
Alloc.Release();
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TScalarApplyWrapper : public TMutableComputationNode<TScalarApplyWrapper>
nullptr, Alloc.Ref(), *RandomProvider, *TimeProvider, NUdf::EValidatePolicy::Exception, nullptr),
originalContext.Mutables, *NYql::NUdf::GetYqlMemoryPool())
{
Alloc.Ref().EnableArrowTracking = false;
Alloc.Release();
}

Expand Down
13 changes: 9 additions & 4 deletions ydb/library/yql/minikql/computation/mkql_block_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ namespace NKikimr::NMiniKQL {

namespace {

TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
MKQLArrowUntrack(owner->data());
return NYql::MakeReadOnlyRope(owner, data, size);
}

class TOwnedArrowBuffer : public arrow::Buffer {
public:
TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner)
Expand Down Expand Up @@ -76,7 +81,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes));
}

void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
Expand Down Expand Up @@ -205,7 +210,7 @@ class TFixedSizeBlockSerializer final : public IBlockSerializer {
const ui64 desiredOffset = data.offset % 8;
const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes));
}
};

Expand Down Expand Up @@ -277,11 +282,11 @@ class TStringBlockSerializer final : public IBlockSerializer {
const ui64 desiredOffset = data.offset % 8;
const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize));

const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
size_t mainSize = data.buffers[2]->size();
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize));
}
};

Expand Down
64 changes: 58 additions & 6 deletions ydb/library/yql/minikql/mkql_alloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {

void TAllocState::TListEntry::Unlink() noexcept {
std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
Left = Right = nullptr;
Clear();
}

TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
Expand All @@ -31,20 +31,33 @@ TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAligne
GetRoot()->InitLinks();
OffloadedBlocksRoot.InitLinks();
GlobalPAllocList.InitLinks();
ArrowBlocksRoot.InitLinks();
}

void TAllocState::CleanupPAllocList(TListEntry* root) {
for (auto curr = root->Right; curr != root; ) {
auto next = curr->Right;
auto size = ((TMkqlPAllocHeader*)curr)->Size;
auto fullSize = size + sizeof(TMkqlPAllocHeader);
auto fullSize = size + sizeof(TMkqlPAllocHeader);
MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
curr = next;
}

root->InitLinks();
}

void TAllocState::CleanupArrowList(TListEntry* root) {
for (auto curr = root->Right; curr != root; ) {
auto next = curr->Right;
auto size = ((TMkqlArrowHeader*)curr)->Size;
auto fullSize = size + sizeof(TMkqlArrowHeader);
ReleaseAlignedPage(curr, fullSize);
curr = next;
}

root->InitLinks();
}

void TAllocState::KillAllBoxed() {
{
const auto root = GetRoot();
Expand Down Expand Up @@ -72,6 +85,8 @@ void TAllocState::KillAllBoxed() {
OffloadedBlocksRoot.InitLinks();
}

CleanupArrowList(&ArrowBlocksRoot);

#ifndef NDEBUG
ActiveMemInfo.clear();
#endif
Expand Down Expand Up @@ -230,18 +245,55 @@ void TPagedArena::Clear() noexcept {
}

void* MKQLArrowAllocate(ui64 size) {
return GetAlignedPage(size);
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
auto fullSize = size + sizeof(TMkqlArrowHeader);
if (state->EnableArrowTracking) {
state->OffloadAlloc(fullSize);
}

auto ptr = GetAlignedPage(fullSize);
auto header = (TMkqlArrowHeader*)ptr;
if (state->EnableArrowTracking) {
header->Entry.Link(&state->ArrowBlocksRoot);
} else {
header->Entry.Clear();
}

header->Size = size;
return header + 1;
}

void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
auto res = GetAlignedPage(size);
auto res = MKQLArrowAllocate(size);
memcpy(res, mem, Min(prevSize, size));
ReleaseAlignedPage(const_cast<void*>(mem), prevSize);
MKQLArrowFree(mem, prevSize);
return res;
}

void MKQLArrowFree(const void* mem, ui64 size) {
ReleaseAlignedPage(const_cast<void*>(mem), size);
auto fullSize = size + sizeof(TMkqlArrowHeader);
auto header = ((TMkqlArrowHeader*)mem) - 1;
if (!header->Entry.IsUnlinked()) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
state->OffloadFree(fullSize);
header->Entry.Unlink();
}

Y_ENSURE(size == header->Size);
ReleaseAlignedPage(header, fullSize);
}

void MKQLArrowUntrack(const void* mem) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
auto header = ((TMkqlArrowHeader*)mem) - 1;
if (!header->Entry.IsUnlinked()) {
header->Entry.Unlink();
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
state->OffloadFree(fullSize);
}
}

} // NMiniKQL
Expand Down
17 changes: 16 additions & 1 deletion ydb/library/yql/minikql/mkql_alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct TAllocState : public TAlignedPagePool
void Link(TListEntry* root) noexcept;
void Unlink() noexcept;
void InitLinks() noexcept { Left = Right = this; }
void Clear() noexcept { Left = Right = nullptr; }
bool IsUnlinked() const noexcept { return !Left && !Right; }
};

#ifndef NDEBUG
Expand All @@ -74,7 +76,9 @@ struct TAllocState : public TAlignedPagePool
TListEntry OffloadedBlocksRoot;
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
std::shared_ptr<std::atomic<size_t>> ArrowMemoryUsage = std::make_shared<std::atomic<size_t>>();
TListEntry ArrowBlocksRoot;
bool EnableArrowTracking = true;

void* MainContext = nullptr;
void* CurrentContext = nullptr;

Expand All @@ -97,6 +101,7 @@ struct TAllocState : public TAlignedPagePool
void InvalidateMemInfo();
size_t GetDeallocatedInPages() const;
static void CleanupPAllocList(TListEntry* root);
static void CleanupArrowList(TListEntry* root);

void LockObject(::NKikimr::NUdf::TUnboxedValuePod value);
void UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value);
Expand Down Expand Up @@ -163,6 +168,15 @@ static_assert(sizeof(TMkqlPAllocHeader) ==
sizeof(TAllocState::TListEntry) +
sizeof(void*), "Padding is not allowed");

constexpr size_t ArrowAlignment = 64;
struct TMkqlArrowHeader {
TAllocState::TListEntry Entry;
ui64 Size;
char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
};

static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);

class TScopedAlloc {
public:
explicit TScopedAlloc(const TSourceLocation& location,
Expand Down Expand Up @@ -410,6 +424,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
void* MKQLArrowAllocate(ui64 size);
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
void MKQLArrowFree(const void* mem, ui64 size);
void MKQLArrowUntrack(const void* mem);

template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
struct TWithMiniKQLAlloc {
Expand Down