Skip to content

Commit 6db67bf

Browse files
committed
init
1 parent 9dfbfa9 commit 6db67bf

File tree

3 files changed

+71
-10
lines changed

3 files changed

+71
-10
lines changed

ydb/library/yql/minikql/computation/mkql_block_transport.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ namespace NKikimr::NMiniKQL {
1010

1111
namespace {
1212

13+
TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
14+
MKQLArrowUntrack(owner->data());
15+
return NYql::MakeReadOnlyRope(owner, data, size);
16+
}
17+
1318
class TOwnedArrowBuffer : public arrow::Buffer {
1419
public:
1520
TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner)
@@ -76,7 +81,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
7681
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
7782
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
7883
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
79-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes));
84+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes));
8085
}
8186

8287
void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
@@ -205,7 +210,7 @@ class TFixedSizeBlockSerializer final : public IBlockSerializer {
205210
const ui64 desiredOffset = data.offset % 8;
206211
const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
207212
size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
208-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes));
213+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes));
209214
}
210215
};
211216

@@ -277,11 +282,11 @@ class TStringBlockSerializer final : public IBlockSerializer {
277282
const ui64 desiredOffset = data.offset % 8;
278283
const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
279284
size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
280-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize));
285+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize));
281286

282287
const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
283288
size_t mainSize = data.buffers[2]->size();
284-
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize));
289+
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize));
285290
}
286291
};
287292

ydb/library/yql/minikql/mkql_alloc.cpp

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,33 @@ TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAligne
3131
GetRoot()->InitLinks();
3232
OffloadedBlocksRoot.InitLinks();
3333
GlobalPAllocList.InitLinks();
34+
ArrowBlocksRoot.InitLinks();
3435
}
3536

3637
void TAllocState::CleanupPAllocList(TListEntry* root) {
3738
for (auto curr = root->Right; curr != root; ) {
3839
auto next = curr->Right;
3940
auto size = ((TMkqlPAllocHeader*)curr)->Size;
40-
auto fullSize = size + sizeof(TMkqlPAllocHeader);
41+
auto fullSize = size + sizeof(TMkqlPAllocHeader);
4142
MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
4243
curr = next;
4344
}
4445

4546
root->InitLinks();
4647
}
4748

49+
void TAllocState::CleanupArrowList(TListEntry* root) {
50+
for (auto curr = root->Right; curr != root; ) {
51+
auto next = curr->Right;
52+
auto size = ((TMkqlArrowHeader*)curr)->Size;
53+
auto fullSize = size + sizeof(TMkqlArrowHeader);
54+
ReleaseAlignedPage(curr, fullSize);
55+
curr = next;
56+
}
57+
58+
root->InitLinks();
59+
}
60+
4861
void TAllocState::KillAllBoxed() {
4962
{
5063
const auto root = GetRoot();
@@ -72,6 +85,8 @@ void TAllocState::KillAllBoxed() {
7285
OffloadedBlocksRoot.InitLinks();
7386
}
7487

88+
CleanupArrowList(&ArrowBlocksRoot);
89+
7590
#ifndef NDEBUG
7691
ActiveMemInfo.clear();
7792
#endif
@@ -230,18 +245,47 @@ void TPagedArena::Clear() noexcept {
230245
}
231246

232247
void* MKQLArrowAllocate(ui64 size) {
233-
return GetAlignedPage(size);
248+
TAllocState* state = TlsAllocState;
249+
Y_ENSURE(state);
250+
auto fullSize = size + sizeof(TMkqlArrowHeader);
251+
state->OffloadAlloc(fullSize);
252+
auto ptr = GetAlignedPage(fullSize);
253+
auto header = (TMkqlArrowHeader*)ptr;
254+
header->Entry.Link(&state->ArrowBlocksRoot);
255+
header->Size = size;
256+
return header + 1;
234257
}
235258

236259
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
237-
auto res = GetAlignedPage(size);
260+
auto res = MKQLArrowAllocate(size);
238261
memcpy(res, mem, Min(prevSize, size));
239-
ReleaseAlignedPage(const_cast<void*>(mem), prevSize);
262+
MKQLArrowFree(mem, prevSize);
240263
return res;
241264
}
242265

243266
void MKQLArrowFree(const void* mem, ui64 size) {
244-
ReleaseAlignedPage(const_cast<void*>(mem), size);
267+
auto fullSize = size + sizeof(TMkqlArrowHeader);
268+
auto header = ((TMkqlArrowHeader*)mem) - 1;
269+
if (!header->Entry.IsUnlinked()) {
270+
TAllocState* state = TlsAllocState;
271+
Y_ENSURE(state);
272+
state->OffloadFree(fullSize);
273+
header->Entry.Unlink();
274+
}
275+
276+
Y_ENSURE(size == header->Size);
277+
ReleaseAlignedPage(header, fullSize);
278+
}
279+
280+
void MKQLArrowUntrack(const void* mem) {
281+
TAllocState* state = TlsAllocState;
282+
Y_ENSURE(state);
283+
auto header = ((TMkqlArrowHeader*)mem) - 1;
284+
if (!header->Entry.IsUnlinked()) {
285+
header->Entry.Unlink();
286+
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
287+
state->OffloadFree(fullSize);
288+
}
245289
}
246290

247291
} // NMiniKQL

ydb/library/yql/minikql/mkql_alloc.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct TAllocState : public TAlignedPagePool
5050
void Link(TListEntry* root) noexcept;
5151
void Unlink() noexcept;
5252
void InitLinks() noexcept { Left = Right = this; }
53+
bool IsUnlinked() const noexcept { return !Left && !Right; }
5354
};
5455

5556
#ifndef NDEBUG
@@ -74,7 +75,7 @@ struct TAllocState : public TAlignedPagePool
7475
TListEntry OffloadedBlocksRoot;
7576
TListEntry GlobalPAllocList;
7677
TListEntry* CurrentPAllocList;
77-
std::shared_ptr<std::atomic<size_t>> ArrowMemoryUsage = std::make_shared<std::atomic<size_t>>();
78+
TListEntry ArrowBlocksRoot;
7879
void* MainContext = nullptr;
7980
void* CurrentContext = nullptr;
8081

@@ -97,6 +98,7 @@ struct TAllocState : public TAlignedPagePool
9798
void InvalidateMemInfo();
9899
size_t GetDeallocatedInPages() const;
99100
static void CleanupPAllocList(TListEntry* root);
101+
static void CleanupArrowList(TListEntry* root);
100102

101103
void LockObject(::NKikimr::NUdf::TUnboxedValuePod value);
102104
void UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value);
@@ -163,6 +165,15 @@ static_assert(sizeof(TMkqlPAllocHeader) ==
163165
sizeof(TAllocState::TListEntry) +
164166
sizeof(void*), "Padding is not allowed");
165167

168+
constexpr size_t ArrowAlignment = 64;
169+
struct TMkqlArrowHeader {
170+
TAllocState::TListEntry Entry;
171+
ui64 Size;
172+
char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
173+
};
174+
175+
static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);
176+
166177
class TScopedAlloc {
167178
public:
168179
explicit TScopedAlloc(const TSourceLocation& location,
@@ -410,6 +421,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
410421
void* MKQLArrowAllocate(ui64 size);
411422
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
412423
void MKQLArrowFree(const void* mem, ui64 size);
424+
void MKQLArrowUntrack(const void* mem);
413425

414426
template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
415427
struct TWithMiniKQLAlloc {

0 commit comments

Comments
 (0)