Skip to content

Commit 75df170

Browse files
committed
BufferPool-v1.0:Functionality correct but performance issues still exist
1 parent d5faa3b commit 75df170

35 files changed

+1282
-226
lines changed

cpp/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
2020

2121
add_subdirectory(pixels-common)
2222
add_subdirectory(pixels-core)
23-
add_subdirectory(pixels-cli)
23+
#add_subdirectory(pixels-cli)
2424
add_subdirectory(third-party/googletest)
2525
add_subdirectory(tests)
2626

cpp/Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ debug: deps
5757

5858
release: deps
5959
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/release && \
60-
cmake --build build/release --config Release
60+
cmake --build build/release --config Release
61+
62+
relWithDebInfo: deps
63+
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/relWithDebInfo && \
64+
cmake --build build/relWithDebInfo --config RelWithDebInfo

cpp/include/PixelsReadGlobalState.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ namespace duckdb
4040
{
4141
mutex lock;
4242

43+
atomic<int> active_threads; // 活跃线程数
44+
atomic<bool> all_done; // 是否所有线程都已完成
45+
4346
//! The initial reader from the bind phase
4447
std::shared_ptr <PixelsReader> initialPixelsReader;
4548

cpp/pixels-common/CMakeLists.txt

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,21 @@ set(CMAKE_CXX_STANDARD 17)
44

55
include(ExternalProject)
66

7-
set(pixels_common_cxx
8-
lib/physical/storage/LocalFS.cpp
9-
lib/physical/storage/LocalFSProvider.cpp
10-
lib/physical/storage/PhysicalLocalWriter.cpp
11-
lib/physical/PhysicalWriterOption.cpp
12-
lib/physical/Status.cpp
13-
lib/physical/Storage.cpp
14-
lib/physical/FilePath.cpp
15-
lib/physical/natives/PixelsRandomAccessFile.cpp
16-
lib/physical/natives/DirectRandomAccessFile.cpp
17-
lib/physical/natives/ByteBuffer.cpp
18-
lib/physical/io/PhysicalLocalReader.cpp
19-
lib/physical/StorageFactory.cpp
20-
lib/physical/Request.cpp
21-
lib/physical/RequestBatch.cpp
22-
lib/physical/scheduler/NoopScheduler.cpp
23-
lib/physical/SchedulerFactory.cpp
24-
lib/exception/InvalidArgumentException.cpp
25-
lib/utils/Constants.cpp
26-
lib/utils/String.cpp
27-
include/physical/natives/DirectIoLib.h
28-
lib/physical/natives/DirectIoLib.cpp
29-
include/utils/ConfigFactory.h
30-
lib/utils/ConfigFactory.cpp
31-
include/physical/MergedRequest.h
32-
include/physical/scheduler/SortMergeScheduler.h
33-
lib/physical/scheduler/SortMergeScheduler.cpp
34-
lib/MergedRequest.cpp include/profiler/TimeProfiler.h
35-
lib/profiler/TimeProfiler.cpp
36-
include/profiler/CountProfiler.h
37-
lib/profiler/CountProfiler.cpp
38-
include/profiler/AbstractProfiler.h
39-
include/physical/allocator/Allocator.h
40-
include/physical/allocator/OrdinaryAllocator.h
41-
lib/physical/allocator/OrdinaryAllocator.cpp
42-
include/physical/allocator/BufferPoolAllocator.h
43-
lib/physical/allocator/BufferPoolAllocator.cpp
44-
include/physical/BufferPool.h
45-
lib/physical/BufferPool.cpp
46-
include/physical/natives/DirectUringRandomAccessFile.h
47-
lib/physical/natives/DirectUringRandomAccessFile.cpp
48-
include/utils/ColumnSizeCSVReader.h lib/utils/ColumnSizeCSVReader.cpp
49-
include/physical/StorageArrayScheduler.h lib/physical/StorageArrayScheduler.cpp
50-
include/physical/natives/ByteOrder.h
7+
8+
file(GLOB_RECURSE pixels_common_cxx
9+
"lib/physical/*.cpp"
10+
"lib/physical/*.h"
11+
"lib/exception/*.cpp"
12+
"lib/exception/*.h"
13+
"lib/utils/*.cpp"
14+
"lib/utils/*.h"
15+
"lib/profiler/*.cpp"
16+
"lib/profiler/*.h"
17+
"include/physical/*.h"
18+
"include/profiler/*.h"
19+
"include/utils/*.h"
20+
"include/physical/BufferPool/*.h"
21+
"lib/MergedRequest.cpp"
5122
)
5223

5324
include_directories(include)

cpp/pixels-common/include/physical/BufferPool.h

Lines changed: 123 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,41 +33,154 @@
3333
#include "exception/InvalidArgumentException.h"
3434
#include "utils/ColumnSizeCSVReader.h"
3535
#include <map>
36-
36+
#include "physical/BufferPool/Bitmap.h"
37+
#include "physical/BufferPool/BufferPoolEntry.h"
38+
#include <mutex>
39+
#include <thread>
40+
#include <cstdio>
3741
// when allocating buffer pool, we use the size of the first pxl file. Consider that
3842
// the remaining pxl file has larger size than the first file, we allocate some extra
3943
// size (10MB) to each column.
4044
// TODO: how to evaluate the maximal pool size
41-
#define EXTRA_POOL_SIZE 3*1024*1024
45+
#define EXTRA_POOL_SIZE 10*1024*1024
4246

4347
class DirectUringRandomAccessFile;
4448
// This class is global class. The variable is shared by each thread
4549
class BufferPool
4650
{
4751
public:
52+
// 嵌套子类,用于管理缓冲区池条目及其属性
53+
class BufferPoolManagedEntry {
54+
public:
55+
enum class State{
56+
InitizaledNotAllocated,
57+
AllocatedAndInUse,
58+
UselessButNotFree
59+
};
60+
private:
61+
std::shared_ptr<BufferPoolEntry> bufferPoolEntry; // 指向缓冲区池条目的智能指针
62+
int ring_index; // 环形缓冲区索引
63+
size_t current_size; // 当前使用大小
64+
int offset; // 偏移量
65+
State state;
66+
67+
68+
69+
public:
70+
71+
BufferPoolManagedEntry(std::shared_ptr<BufferPoolEntry> entry,
72+
int ringIdx,
73+
size_t currSize,
74+
off_t off)
75+
: bufferPoolEntry(std::move(entry)),
76+
ring_index(ringIdx),
77+
current_size(currSize),
78+
offset(off) ,
79+
state(State::InitizaledNotAllocated){}
80+
81+
std::shared_ptr<BufferPoolEntry> getBufferPoolEntry() const {
82+
return bufferPoolEntry;
83+
}
84+
85+
int getRingIndex() const {
86+
return ring_index;
87+
}
88+
89+
void setRingIndex(int index) {
90+
ring_index = index;
91+
}
92+
93+
size_t getCurrentSize() const {
94+
return current_size;
95+
}
96+
97+
void setCurrentSize(size_t size) {
98+
current_size = size;
99+
}
100+
101+
int getOffset() const {
102+
return offset;
103+
}
104+
105+
void setOffset(int off) {
106+
offset = off;
107+
}
108+
109+
// 获取当前状态
110+
State getStatus() const {
111+
return state;
112+
}
113+
114+
// 设置状态
115+
void setStatus(State newStatus) {
116+
state = newStatus;
117+
}
118+
};
119+
48120
static void
49121
Initialize(std::vector <uint32_t> colIds, std::vector <uint64_t> bytes, std::vector <std::string> columnNames);
50122

51-
static std::shared_ptr <ByteBuffer> GetBuffer(uint32_t colId);
123+
static void
124+
InitializeBuffers();
125+
126+
static std::shared_ptr <ByteBuffer> GetBuffer(uint32_t colId,uint64_t byte,std::string columnName);
52127

53-
static int64_t GetBufferId(uint32_t index);
128+
static int64_t GetBufferId();
54129

55130
static void Switch();
56131

57132
static void Reset();
58133

134+
static std::shared_ptr<BufferPoolEntry> AddNewBuffer(size_t size);
135+
136+
static int getRingIndex(uint32_t colId);
137+
138+
static std::shared_ptr<ByteBuffer> AllocateNewBuffer(std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry, uint32_t colId,uint64_t byte,std::string columnName);
139+
140+
static std::shared_ptr<ByteBuffer> ReusePreviousBuffer(std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry,uint32_t colId,uint64_t byte,std::string columnName);
141+
142+
static void PrintStats() {
143+
// 打印当前线程 ID
144+
std::thread::id tid = std::this_thread::get_id();
145+
146+
printf("线程 %zu -> 全局缓冲区使用: %ld / %ld\n",
147+
std::hash<std::thread::id>{}(tid), // 转换成整数便于阅读
148+
global_used_size, global_free_size);
149+
150+
// 线程局部统计
151+
printf("线程 %zu -> Buffer0使用: %zu, 缓冲区数量: %d\n",
152+
std::hash<std::thread::id>{}(tid),
153+
thread_local_used_size[0], thread_local_buffer_count[0]);
154+
155+
printf("线程 %zu -> Buffer1使用: %zu, 缓冲区数量: %d\n",
156+
std::hash<std::thread::id>{}(tid),
157+
thread_local_used_size[1], thread_local_buffer_count[1]);
158+
}
59159
private:
60160
BufferPool() = default;
161+
// global
162+
static std::mutex bufferPoolMutex;
61163

164+
// thread local
165+
static thread_local bool isInitialized;
166+
static thread_local std::vector <std::shared_ptr<BufferPoolEntry>> registeredBuffers[2];
167+
static thread_local long global_used_size;
168+
static thread_local long global_free_size;
169+
static thread_local std::shared_ptr <DirectIoLib> directIoLib;
170+
static thread_local int nextRingIndex;
171+
static thread_local std::shared_ptr<BufferPoolEntry> nextEmptyBufferPoolEntry[2];
62172
static thread_local int colCount;
63-
static thread_local std::map<uint32_t, uint64_t>
64-
nrBytes;
65-
static thread_local bool isInitialized;
66-
static thread_local std::map<uint32_t, std::shared_ptr < ByteBuffer>>
67-
buffers[2];
68-
static std::shared_ptr <DirectIoLib> directIoLib;
173+
69174
static thread_local int currBufferIdx;
70175
static thread_local int nextBufferIdx;
176+
static thread_local std::map <uint32_t, std::shared_ptr<ByteBuffer>> buffersAllocated[2];
71177
friend class DirectUringRandomAccessFile;
178+
179+
static thread_local std::unordered_map<uint32_t, std::shared_ptr<BufferPoolManagedEntry>> ringBufferMap[2];
180+
181+
182+
183+
static thread_local size_t thread_local_used_size[2]; // 线程已使用大小
184+
static thread_local int thread_local_buffer_count[2]; // 线程持有的缓冲区数量
72185
};
73186
#endif // DUCKDB_BUFFERPOOL_H
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
/* @author whz
3+
/* @create 7/30/25.
4+
*/
5+
6+
#ifndef BITMAP_H
7+
#define BITMAP_H
8+
#include <cstddef>
9+
#include "exception/InvalidArgumentException.h"
10+
#include <iostream>
11+
12+
class Bitmap {
13+
public:
14+
15+
explicit Bitmap(size_t size)
16+
: bits((size + 7) / 8, 0), num_bits(size) {}
17+
18+
void set(size_t index) {
19+
if (index >= num_bits) throw InvalidArgumentException("Bitmap::set: index out of range");
20+
bits[index / 8] |= (1 << (index % 8));
21+
}
22+
23+
void clear(size_t index) {
24+
if (index >= num_bits) throw InvalidArgumentException("Bitmap::clear: index out of range");
25+
bits[index / 8] &= ~(1 << (index % 8));
26+
}
27+
28+
bool test(size_t index) const {
29+
if (index >= num_bits) throw InvalidArgumentException("Bitmap::test: index out of range");
30+
return bits[index / 8] & (1 << (index % 8));
31+
}
32+
33+
size_t size() const { return num_bits; }
34+
35+
void print() const {
36+
for (size_t i = 0; i < num_bits; ++i) {
37+
std::cout << (test(i) ? '1' : '0');
38+
if ((i + 1) % 8 == 0) std::cout << ' ';
39+
}
40+
std::cout << '\n';
41+
}
42+
43+
private:
44+
std::vector<uint8_t> bits;
45+
size_t num_bits;
46+
};
47+
48+
#endif //BITMAP_H
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
/* @author whz
3+
/* @create 7/30/25.
4+
*/
5+
6+
#ifndef BUFFERPOOLENTRY_H
7+
#define BUFFERPOOLENTRY_H
8+
#include <memory>
9+
#include <physical/natives/ByteBuffer.h>
10+
#include <physical/natives/DirectIoLib.h>
11+
#include "physical/BufferPool/Bitmap.h"
12+
class BufferPoolEntry {
13+
public:
14+
explicit BufferPoolEntry(size_t size,int slice_size,std::shared_ptr<DirectIoLib> direct_lib,int offset,int ring_index);
15+
size_t getSize() const;
16+
std::shared_ptr<Bitmap> getBitmap() const;
17+
std::shared_ptr<ByteBuffer> getBuffer() const;
18+
bool isFull() const;
19+
int getNextFreeIndex() const;
20+
int setNextFreeIndex(int index);
21+
~BufferPoolEntry();
22+
uint64_t checkCol(uint32_t) const;
23+
void addCol(uint32_t colId,uint64_t bytes);
24+
bool isInUse() const;
25+
void setInUse(bool in_use);
26+
int getOffsetInBuffers() const;
27+
void setOffsetInBuffers(int offset);
28+
bool getIsRegistered() const;
29+
void setIsRegistered(bool registered);
30+
int getRingIndex() const;
31+
void setRingIndex(int ring_index);
32+
void reset();
33+
private:
34+
size_t size_;
35+
std::shared_ptr<Bitmap> bitmap_;
36+
std::shared_ptr<ByteBuffer> buffer_;
37+
bool is_full_;
38+
int next_free_;
39+
std::map<uint32_t, uint64_t> nr_bytes_;
40+
bool is_in_use_;
41+
int offset_in_buffers_;
42+
bool is_registered;
43+
int ring_index;
44+
};
45+
46+
#endif //BUFFERPOOLENTRY_H

cpp/pixels-common/include/physical/Request.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class Request
3434
uint64_t queryId;
3535
uint64_t start;
3636
uint64_t length;
37+
int ring_index;
3738

3839
Request(uint64_t queryId_, uint64_t start_, uint64_t length_,
3940
int64_t bufferId = -1);

cpp/pixels-common/include/physical/RequestBatch.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class RequestBatch
4242

4343
void add(Request request);
4444

45+
Request& getRequest(int index);
46+
4547
int getSize();
4648

4749
std::vector <Request> getRequests();

cpp/pixels-common/include/physical/Scheduler.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "physical/PhysicalReader.h"
2929
#include "physical/RequestBatch.h"
3030
#include "profiler/TimeProfiler.h"
31+
#include <mutex>
3132

3233
class Scheduler
3334
{
@@ -46,5 +47,6 @@ class Scheduler
4647
RequestBatch batch,
4748
std::vector <std::shared_ptr<ByteBuffer>> reuseBuffers,
4849
long queryId) = 0;
50+
std::mutex mtx;
4951
};
5052
#endif //PIXELS_SCHEDULER_H

0 commit comments

Comments
 (0)