Skip to content

Commit 06f4c88

Browse files
authored
Arrow memory tracking (#6731)
1 parent 5e926e1 commit 06f4c88

File tree

5 files changed

+85
-11
lines changed

5 files changed

+85
-11
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class TApplyWrapper: public TMutableCodegeneratorPtrNode<TApplyWrapper> {
2424
, ValueBuilder(HolderFactory, NUdf::EValidatePolicy::Exception)
2525
, Args(argsCount)
2626
{
27+
Alloc.Ref().EnableArrowTracking = false;
2728
Alloc.Release();
2829
}
2930

ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class TScalarApplyWrapper : public TMutableComputationNode<TScalarApplyWrapper>
5555
nullptr, Alloc.Ref(), *RandomProvider, *TimeProvider, NUdf::EValidatePolicy::Exception, nullptr),
5656
originalContext.Mutables, *NYql::NUdf::GetYqlMemoryPool())
5757
{
58+
Alloc.Ref().EnableArrowTracking = false;
5859
Alloc.Release();
5960
}
6061

ydb/library/yql/minikql/computation/mkql_block_transport.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ namespace NKikimr::NMiniKQL {
1010

1111
namespace {
1212

13+
TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
14+
MKQLArrowUntrack(owner->data());
15+
return NYql::MakeReadOnlyRope(owner, data, size);
16+
}
17+
1318
class TOwnedArrowBuffer : public arrow::Buffer {
1419
public:
1520
TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner)
@@ -76,7 +81,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
7681
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
7782
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
7883
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
79-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes));
84+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes));
8085
}
8186

8287
void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
@@ -205,7 +210,7 @@ class TFixedSizeBlockSerializer final : public IBlockSerializer {
205210
const ui64 desiredOffset = data.offset % 8;
206211
const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
207212
size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
208-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes));
213+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes));
209214
}
210215
};
211216

@@ -277,11 +282,11 @@ class TStringBlockSerializer final : public IBlockSerializer {
277282
const ui64 desiredOffset = data.offset % 8;
278283
const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
279284
size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
280-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize));
285+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize));
281286

282287
const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
283288
size_t mainSize = data.buffers[2]->size();
284-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize));
289+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize));
285290
}
286291
};
287292

ydb/library/yql/minikql/mkql_alloc.cpp

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {
2020

2121
void TAllocState::TListEntry::Unlink() noexcept {
2222
std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
23-
Left = Right = nullptr;
23+
Clear();
2424
}
2525

2626
TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
@@ -31,20 +31,33 @@ TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAligne
3131
GetRoot()->InitLinks();
3232
OffloadedBlocksRoot.InitLinks();
3333
GlobalPAllocList.InitLinks();
34+
ArrowBlocksRoot.InitLinks();
3435
}
3536

3637
void TAllocState::CleanupPAllocList(TListEntry* root) {
3738
for (auto curr = root->Right; curr != root; ) {
3839
auto next = curr->Right;
3940
auto size = ((TMkqlPAllocHeader*)curr)->Size;
40-
auto fullSize = size + sizeof(TMkqlPAllocHeader);
41+
auto fullSize = size + sizeof(TMkqlPAllocHeader);
4142
MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
4243
curr = next;
4344
}
4445

4546
root->InitLinks();
4647
}
4748

49+
void TAllocState::CleanupArrowList(TListEntry* root) {
50+
for (auto curr = root->Right; curr != root; ) {
51+
auto next = curr->Right;
52+
auto size = ((TMkqlArrowHeader*)curr)->Size;
53+
auto fullSize = size + sizeof(TMkqlArrowHeader);
54+
ReleaseAlignedPage(curr, fullSize);
55+
curr = next;
56+
}
57+
58+
root->InitLinks();
59+
}
60+
4861
void TAllocState::KillAllBoxed() {
4962
{
5063
const auto root = GetRoot();
@@ -72,6 +85,8 @@ void TAllocState::KillAllBoxed() {
7285
OffloadedBlocksRoot.InitLinks();
7386
}
7487

88+
CleanupArrowList(&ArrowBlocksRoot);
89+
7590
#ifndef NDEBUG
7691
ActiveMemInfo.clear();
7792
#endif
@@ -230,18 +245,55 @@ void TPagedArena::Clear() noexcept {
230245
}
231246

232247
void* MKQLArrowAllocate(ui64 size) {
233-
return GetAlignedPage(size);
248+
TAllocState* state = TlsAllocState;
249+
Y_ENSURE(state);
250+
auto fullSize = size + sizeof(TMkqlArrowHeader);
251+
if (state->EnableArrowTracking) {
252+
state->OffloadAlloc(fullSize);
253+
}
254+
255+
auto ptr = GetAlignedPage(fullSize);
256+
auto header = (TMkqlArrowHeader*)ptr;
257+
if (state->EnableArrowTracking) {
258+
header->Entry.Link(&state->ArrowBlocksRoot);
259+
} else {
260+
header->Entry.Clear();
261+
}
262+
263+
header->Size = size;
264+
return header + 1;
234265
}
235266

236267
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
237-
auto res = GetAlignedPage(size);
268+
auto res = MKQLArrowAllocate(size);
238269
memcpy(res, mem, Min(prevSize, size));
239-
ReleaseAlignedPage(const_cast<void*>(mem), prevSize);
270+
MKQLArrowFree(mem, prevSize);
240271
return res;
241272
}
242273

243274
void MKQLArrowFree(const void* mem, ui64 size) {
244-
ReleaseAlignedPage(const_cast<void*>(mem), size);
275+
auto fullSize = size + sizeof(TMkqlArrowHeader);
276+
auto header = ((TMkqlArrowHeader*)mem) - 1;
277+
if (!header->Entry.IsUnlinked()) {
278+
TAllocState* state = TlsAllocState;
279+
Y_ENSURE(state);
280+
state->OffloadFree(fullSize);
281+
header->Entry.Unlink();
282+
}
283+
284+
Y_ENSURE(size == header->Size);
285+
ReleaseAlignedPage(header, fullSize);
286+
}
287+
288+
void MKQLArrowUntrack(const void* mem) {
289+
TAllocState* state = TlsAllocState;
290+
Y_ENSURE(state);
291+
auto header = ((TMkqlArrowHeader*)mem) - 1;
292+
if (!header->Entry.IsUnlinked()) {
293+
header->Entry.Unlink();
294+
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
295+
state->OffloadFree(fullSize);
296+
}
245297
}
246298

247299
} // NMiniKQL

ydb/library/yql/minikql/mkql_alloc.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ struct TAllocState : public TAlignedPagePool
5050
void Link(TListEntry* root) noexcept;
5151
void Unlink() noexcept;
5252
void InitLinks() noexcept { Left = Right = this; }
53+
void Clear() noexcept { Left = Right = nullptr; }
54+
bool IsUnlinked() const noexcept { return !Left && !Right; }
5355
};
5456

5557
#ifndef NDEBUG
@@ -74,7 +76,9 @@ struct TAllocState : public TAlignedPagePool
7476
TListEntry OffloadedBlocksRoot;
7577
TListEntry GlobalPAllocList;
7678
TListEntry* CurrentPAllocList;
77-
std::shared_ptr<std::atomic<size_t>> ArrowMemoryUsage = std::make_shared<std::atomic<size_t>>();
79+
TListEntry ArrowBlocksRoot;
80+
bool EnableArrowTracking = true;
81+
7882
void* MainContext = nullptr;
7983
void* CurrentContext = nullptr;
8084

@@ -97,6 +101,7 @@ struct TAllocState : public TAlignedPagePool
97101
void InvalidateMemInfo();
98102
size_t GetDeallocatedInPages() const;
99103
static void CleanupPAllocList(TListEntry* root);
104+
static void CleanupArrowList(TListEntry* root);
100105

101106
void LockObject(::NKikimr::NUdf::TUnboxedValuePod value);
102107
void UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value);
@@ -163,6 +168,15 @@ static_assert(sizeof(TMkqlPAllocHeader) ==
163168
sizeof(TAllocState::TListEntry) +
164169
sizeof(void*), "Padding is not allowed");
165170

171+
constexpr size_t ArrowAlignment = 64;
172+
struct TMkqlArrowHeader {
173+
TAllocState::TListEntry Entry;
174+
ui64 Size;
175+
char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
176+
};
177+
178+
static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);
179+
166180
class TScopedAlloc {
167181
public:
168182
explicit TScopedAlloc(const TSourceLocation& location,
@@ -410,6 +424,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
410424
void* MKQLArrowAllocate(ui64 size);
411425
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
412426
void MKQLArrowFree(const void* mem, ui64 size);
427+
void MKQLArrowUntrack(const void* mem);
413428

414429
template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
415430
struct TWithMiniKQLAlloc {

0 commit comments

Comments
 (0)