Skip to content

Commit 8aad40e

Browse files
author
zuochunwei
committed
spillheap
1 parent 68be4e0 commit 8aad40e

File tree

4 files changed

+227
-469
lines changed

4 files changed

+227
-469
lines changed

velox/common/file/File.h

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,164 @@ class InMemoryWriteFile final : public WriteFile {
229229
std::string* FOLLY_NONNULL file_;
230230
};
231231

232+
// TODO zuochunwei
233+
struct HeapMemoryMock {
234+
HeapMemoryMock() = default;
235+
explicit HeapMemoryMock(void* memory, size_t capacity)
236+
: memory_(memory), capacity_(capacity) {}
237+
238+
void reset() {
239+
memory_ = nullptr;
240+
size_ = 0;
241+
capacity_ = 0;
242+
}
243+
244+
bool isValid() const {
245+
return memory_ != nullptr;
246+
}
247+
248+
void write(const void* src, size_t len) {
249+
assert(len <= freeSize());
250+
memcpy(end(), src, len);
251+
size_ += len;
252+
}
253+
254+
void read(void* dst, size_t len, size_t offset) {
255+
assert(offset + len <= size_);
256+
memcpy(dst, (char*)memory_ + offset, len);
257+
}
258+
259+
auto size() const {
260+
return size_;
261+
}
262+
263+
auto freeSize() const {
264+
return capacity_ - size_;
265+
}
266+
267+
void* begin() {
268+
return memory_;
269+
}
270+
271+
void* end() {
272+
return (char*)memory_ + size_;
273+
}
274+
275+
void* memory_ = nullptr;
276+
size_t size_ = 0;
277+
size_t capacity_ = 0;
278+
};
279+
280+
const size_t kHeapMemoryCapacity = 64 * 1024;
281+
282+
class HeapMemoryMockManager {
283+
public:
284+
static HeapMemoryMockManager& instance() {
285+
static HeapMemoryMockManager hmmm;
286+
return hmmm;
287+
}
288+
289+
HeapMemoryMock alloc(size_t size) {
290+
HeapMemoryMock heapMemory;
291+
if (size_ + size <= kHeapMemoryCapacity) {
292+
heapMemory.memory_ = malloc(size);
293+
heapMemory.size_ = 0;
294+
heapMemory.capacity_ = size;
295+
size_ += size;
296+
}
297+
return heapMemory;
298+
}
299+
300+
void free(HeapMemoryMock& heapMemory) {
301+
if (heapMemory.isValid()) {
302+
size_ -= heapMemory.size_;
303+
::free(heapMemory.memory_);
304+
heapMemory.reset();
305+
}
306+
}
307+
308+
private:
309+
std::atomic<std::size_t> size_;
310+
};
311+
312+
inline HeapMemoryMock allocHeapMemory(size_t size) {
313+
return HeapMemoryMockManager::instance().alloc(size);
314+
}
315+
316+
inline void freeHeapMemory(HeapMemoryMock& heapMemory) {
317+
HeapMemoryMockManager::instance().free(heapMemory);
318+
}
319+
320+
class HeapMemoryReadFile : public ReadFile {
321+
public:
322+
explicit HeapMemoryReadFile(HeapMemoryMock& heapMemory)
323+
: heapMemory_(heapMemory) {}
324+
325+
std::string_view pread(
326+
uint64_t offset,
327+
uint64_t length,
328+
void* FOLLY_NONNULL buf) const override {
329+
bytesRead_ += length;
330+
heapMemory_.read(buf, length, offset);
331+
return {static_cast<char*>(buf), length};
332+
}
333+
334+
std::string pread(uint64_t offset, uint64_t length) const override {
335+
bytesRead_ += length;
336+
assert(offset + lenght <= heapMemory_.size());
337+
return std::string((char*)heapMemory_.begin() + offset, length);
338+
}
339+
340+
uint64_t size() const final {
341+
return heapMemory_.size();
342+
}
343+
344+
uint64_t memoryUsage() const final {
345+
return size();
346+
}
347+
348+
// Mainly for testing. Coalescing isn't helpful for in memory data.
349+
void setShouldCoalesce(bool shouldCoalesce) {
350+
shouldCoalesce_ = shouldCoalesce;
351+
}
352+
bool shouldCoalesce() const final {
353+
return shouldCoalesce_;
354+
}
355+
356+
std::string getName() const override {
357+
return "<HeapMemoryReadFile>";
358+
}
359+
360+
uint64_t getNaturalReadSize() const override {
361+
return 1024;
362+
}
363+
364+
private:
365+
HeapMemoryMock& heapMemory_;
366+
bool shouldCoalesce_ = false;
367+
};
368+
369+
class HeapMemoryWriteFile final : public WriteFile {
370+
public:
371+
explicit HeapMemoryWriteFile(HeapMemoryMock& heapMemory)
372+
: heapMemory_(heapMemory) {}
373+
374+
void append(std::string_view data) final {
375+
heapMemory_.write(data.data(), data.length());
376+
}
377+
378+
void flush() final {}
379+
380+
void close() final {}
381+
382+
uint64_t size() const final {
383+
return heapMemory_.size_;
384+
}
385+
386+
private:
387+
HeapMemoryMock& heapMemory_;
388+
};
389+
232390
// Current implementation for the local version is quite simple (e.g. no
233391
// internal arenaing), as local disk writes are expected to be cheap. Local
234392
// files match against any filepath starting with '/'.

velox/exec/Spill.cpp

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,46 @@ void SpillMergeStream::pop() {
4545
}
4646
}
4747

48-
WriteFile& SpillFile::output() {
49-
if (!output_) {
48+
void SpillFile::newOutput() {
49+
heapMemoryMock_ = allocHeapMemory(targetFileSize_);
50+
if (heapMemoryMock_.isValid()) {
51+
output_ = std::make_unique<HeapMemoryWriteFile>(heapMemoryMock_);
52+
toWhere_ = TO_HEAP;
53+
} else {
5054
auto fs = filesystems::getFileSystem(path_, nullptr);
5155
output_ = fs->openFileForWrite(path_);
56+
toWhere_ = TO_FILE;
57+
}
58+
}
59+
60+
WriteFile& SpillFile::output() {
61+
if (!output_) {
62+
newOutput();
5263
}
5364
return *output_;
5465
}
5566

5667
void SpillFile::startRead() {
5768
constexpr uint64_t kMaxReadBufferSize =
5869
(1 << 20) - AlignedBuffer::kPaddedSize; // 1MB - padding.
70+
5971
VELOX_CHECK(!output_);
6072
VELOX_CHECK(!input_);
61-
auto fs = filesystems::getFileSystem(path_, nullptr);
62-
auto file = fs->openFileForRead(path_);
63-
auto buffer = AlignedBuffer::allocate<char>(
64-
std::min<uint64_t>(fileSize_, kMaxReadBufferSize), &pool_);
65-
input_ = std::make_unique<SpillInput>(std::move(file), std::move(buffer));
73+
74+
if (toWhere_ == TO_FILE) {
75+
auto fs = filesystems::getFileSystem(path_, nullptr);
76+
auto file = fs->openFileForRead(path_);
77+
auto buffer = AlignedBuffer::allocate<char>(
78+
std::min<uint64_t>(fileSize_, kMaxReadBufferSize), &pool_);
79+
input_ = std::make_unique<SpillInput>(std::move(file), std::move(buffer));
80+
} else if (toWhere_ == TO_HEAP) {
81+
auto file = std::make_unique<HeapMemoryReadFile>(heapMemoryMock_);
82+
auto buffer = AlignedBuffer::allocate<char>(
83+
std::min<uint64_t>(fileSize_, kMaxReadBufferSize), &pool_);
84+
input_ = std::make_unique<SpillInput>(std::move(file), std::move(buffer));
85+
} else {
86+
VELOX_FAIL("invalid spill destination");
87+
}
6688
}
6789

6890
bool SpillFile::nextBatch(RowVectorPtr& rowVector) {
@@ -74,18 +96,20 @@ bool SpillFile::nextBatch(RowVectorPtr& rowVector) {
7496
return true;
7597
}
7698

77-
WriteFile& SpillFileList::currentOutput() {
99+
WriteFile& SpillFileList::currentOutput(size_t toAppendSize) {
78100
if (files_.empty() || !files_.back()->isWritable() ||
79-
files_.back()->size() > targetFileSize_) {
101+
files_.back()->size() + toAppendSize > targetFileSize_) {
80102
if (!files_.empty() && files_.back()->isWritable()) {
81103
files_.back()->finishWrite();
82104
}
105+
assert(toAppendSize <= targetFileSize_);
83106
files_.push_back(std::make_unique<SpillFile>(
84107
type_,
85108
numSortingKeys_,
86109
sortCompareFlags_,
87110
fmt::format("{}-{}", path_, files_.size()),
88-
pool_));
111+
pool_,
112+
targetFileSize_));
89113
}
90114
return files_.back()->output();
91115
}
@@ -97,7 +121,14 @@ void SpillFileList::flush() {
97121
batch_->flush(&out);
98122
batch_.reset();
99123
auto iobuf = out.getIOBuf();
100-
auto& file = currentOutput();
124+
125+
size_t toAppendSize = 0;
126+
for (auto& range : *iobuf) {
127+
toAppendSize += range.size();
128+
}
129+
130+
auto& file = currentOutput(toAppendSize);
131+
101132
for (auto& range : *iobuf) {
102133
file.append(std::string_view(
103134
reinterpret_cast<const char*>(range.data()), range.size()));

velox/exec/Spill.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,28 @@ class SpillFile {
6868
int32_t numSortingKeys,
6969
const std::vector<CompareFlags>& sortCompareFlags,
7070
const std::string& path,
71-
memory::MemoryPool& pool)
71+
memory::MemoryPool& pool,
72+
uint64_t targetFileSize)
7273
: type_(std::move(type)),
7374
numSortingKeys_(numSortingKeys),
7475
sortCompareFlags_(sortCompareFlags),
7576
pool_(pool),
7677
ordinal_(ordinalCounter_++),
77-
path_(fmt::format("{}-{}", path, ordinal_)) {
78+
path_(fmt::format("{}-{}", path, ordinal_)),
79+
targetFileSize_(targetFileSize) {
7880
// NOTE: if the spilling operator has specified the sort comparison flags,
7981
// then it must match the number of sorting keys.
8082
VELOX_CHECK(
8183
sortCompareFlags_.empty() ||
8284
sortCompareFlags_.size() == numSortingKeys_);
8385
}
8486

87+
~SpillFile() {
88+
if (heapMemoryMock_.isValid()) {
89+
freeHeapMemory(heapMemoryMock_);
90+
}
91+
}
92+
8593
int32_t numSortingKeys() const {
8694
return numSortingKeys_;
8795
}
@@ -133,6 +141,8 @@ class SpillFile {
133141
}
134142

135143
private:
144+
void newOutput();
145+
136146
static std::atomic<int32_t> ordinalCounter_;
137147

138148
// Type of 'rowVector_'. Needed for setting up writing.
@@ -145,8 +155,17 @@ class SpillFile {
145155
const int32_t ordinal_;
146156
const std::string path_;
147157

158+
enum {
159+
TO_FILE,
160+
TO_HEAP,
161+
} toWhere_ = TO_FILE;
162+
163+
HeapMemoryMock heapMemoryMock_;
164+
148165
// Byte size of the backing file. Set when finishing writing.
149166
uint64_t fileSize_ = 0;
167+
uint64_t targetFileSize_ = 0;
168+
150169
std::unique_ptr<WriteFile> output_;
151170
std::unique_ptr<SpillInput> input_;
152171
};
@@ -215,7 +234,7 @@ class SpillFileList {
215234

216235
private:
217236
// Returns the current file to write to and creates one if needed.
218-
WriteFile& currentOutput();
237+
WriteFile& currentOutput(size_t toAppendSize);
219238

220239
// Writes data from 'batch_' to the current output file.
221240
void flush();

0 commit comments

Comments
 (0)