Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ debug: deps

release: deps
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/release && \
cmake --build build/release --config Release
cmake --build build/release --config Release

relWithDebInfo: deps
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/relWithDebInfo && \
cmake --build build/relWithDebInfo --config RelWithDebInfo
2 changes: 1 addition & 1 deletion cpp/include/PixelsReadBindData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace duckdb
std::shared_ptr <PixelsReader> initialPixelsReader;
std::shared_ptr <TypeDescription> fileSchema;
vector <string> files;
atomic <idx_t> curFileId;
atomic <int> curFileId;
};

}
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/PixelsReadGlobalState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ namespace duckdb
{
mutex lock;

atomic<int> active_threads; // Number of active threads
atomic<bool> all_done; // Whether all threads have completed

//! The initial reader from the bind phase
std::shared_ptr <PixelsReader> initialPixelsReader;

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/PixelsScanFunction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ namespace duckdb
PixelsScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input,
GlobalTableFunctionState *gstate_p);

static bool PixelsParallelStateNext(ClientContext &context, const PixelsReadBindData &bind_data,
static bool PixelsParallelStateNext(ClientContext &context, PixelsReadBindData &bind_data,
PixelsReadLocalState &scan_data, PixelsReadGlobalState &parallel_state,
bool is_init_state = false);

Expand Down
74 changes: 74 additions & 0 deletions cpp/perf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/bin/bash

# Check number of arguments
if [ $# -ne 2 ]; then
echo "Usage: $0 <query_file> <output_file_prefix>"
echo "Example: $0 test_q01.sql query_1"
exit 1
fi

# Assign parameters
QUERY_FILE="$1"
OUTPUT_PREFIX="$2"

# Record start time (nanoseconds)
START_TIME=$(date +%s%N)
PREVIOUS_TIME=$START_TIME

# Function to display time duration
show_time() {
local start=$1
local end=$2
local stage=$3

# Calculate duration in milliseconds (integer arithmetic in bash)
local duration=$(( (end - start) / 1000000 ))
echo "Stage '$stage' duration: ${duration}ms"
}

echo "Starting performance analysis..."

# Run perf recording
echo "1. Running perf record..."
sudo -E perf record -F 1 --call-graph=dwarf -g ./build/release/duckdb < "$QUERY_FILE"
CURRENT_TIME=$(date +%s%N)
show_time $PREVIOUS_TIME $CURRENT_TIME "Running perf record"
PREVIOUS_TIME=$CURRENT_TIME

# Generate perf script output
echo "2. Generating perf script output..."
sudo perf script -i perf.data > "${OUTPUT_PREFIX}.perf"
CURRENT_TIME=$(date +%s%N)
show_time $PREVIOUS_TIME $CURRENT_TIME "Generating perf script output"
PREVIOUS_TIME=$CURRENT_TIME

# Collapse call stacks
echo "3. Collapsing call stacks..."
stackcollapse-perf.pl "${OUTPUT_PREFIX}.perf" > "${OUTPUT_PREFIX}.folded"
CURRENT_TIME=$(date +%s%N)
show_time $PREVIOUS_TIME $CURRENT_TIME "Collapsing call stacks"
PREVIOUS_TIME=$CURRENT_TIME

# Generate flame graph
echo "4. Generating flame graph..."
flamegraph.pl "${OUTPUT_PREFIX}.folded" > "${OUTPUT_PREFIX}-cpu.svg"
CURRENT_TIME=$(date +%s%N)
show_time $PREVIOUS_TIME $CURRENT_TIME "Generating flame graph"
PREVIOUS_TIME=$CURRENT_TIME

# Rename perf data file
echo "5. Renaming perf data file..."
mv perf.data "${OUTPUT_PREFIX}-perf.data"
CURRENT_TIME=$(date +%s%N)
show_time $PREVIOUS_TIME $CURRENT_TIME "Renaming files"
PREVIOUS_TIME=$CURRENT_TIME

# Calculate total duration
TOTAL_DURATION=$(( (CURRENT_TIME - START_TIME) / 1000000 ))
echo "Total duration: ${TOTAL_DURATION}ms"

echo "Operation completed. Generated files:"
echo "${OUTPUT_PREFIX}.perf"
echo "${OUTPUT_PREFIX}.folded"
echo "${OUTPUT_PREFIX}-cpu.svg"
echo "${OUTPUT_PREFIX}-perf.data"
59 changes: 15 additions & 44 deletions cpp/pixels-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,21 @@ set(CMAKE_CXX_STANDARD 17)

include(ExternalProject)

set(pixels_common_cxx
lib/physical/storage/LocalFS.cpp
lib/physical/storage/LocalFSProvider.cpp
lib/physical/storage/PhysicalLocalWriter.cpp
lib/physical/PhysicalWriterOption.cpp
lib/physical/Status.cpp
lib/physical/Storage.cpp
lib/physical/FilePath.cpp
lib/physical/natives/PixelsRandomAccessFile.cpp
lib/physical/natives/DirectRandomAccessFile.cpp
lib/physical/natives/ByteBuffer.cpp
lib/physical/io/PhysicalLocalReader.cpp
lib/physical/StorageFactory.cpp
lib/physical/Request.cpp
lib/physical/RequestBatch.cpp
lib/physical/scheduler/NoopScheduler.cpp
lib/physical/SchedulerFactory.cpp
lib/exception/InvalidArgumentException.cpp
lib/utils/Constants.cpp
lib/utils/String.cpp
include/physical/natives/DirectIoLib.h
lib/physical/natives/DirectIoLib.cpp
include/utils/ConfigFactory.h
lib/utils/ConfigFactory.cpp
include/physical/MergedRequest.h
include/physical/scheduler/SortMergeScheduler.h
lib/physical/scheduler/SortMergeScheduler.cpp
lib/MergedRequest.cpp include/profiler/TimeProfiler.h
lib/profiler/TimeProfiler.cpp
include/profiler/CountProfiler.h
lib/profiler/CountProfiler.cpp
include/profiler/AbstractProfiler.h
include/physical/allocator/Allocator.h
include/physical/allocator/OrdinaryAllocator.h
lib/physical/allocator/OrdinaryAllocator.cpp
include/physical/allocator/BufferPoolAllocator.h
lib/physical/allocator/BufferPoolAllocator.cpp
include/physical/BufferPool.h
lib/physical/BufferPool.cpp
include/physical/natives/DirectUringRandomAccessFile.h
lib/physical/natives/DirectUringRandomAccessFile.cpp
include/utils/ColumnSizeCSVReader.h lib/utils/ColumnSizeCSVReader.cpp
include/physical/StorageArrayScheduler.h lib/physical/StorageArrayScheduler.cpp
include/physical/natives/ByteOrder.h

file(GLOB_RECURSE pixels_common_cxx
"lib/physical/*.cpp"
"lib/physical/*.h"
"lib/exception/*.cpp"
"lib/exception/*.h"
"lib/utils/*.cpp"
"lib/utils/*.h"
"lib/profiler/*.cpp"
"lib/profiler/*.h"
"include/physical/*.h"
"include/profiler/*.h"
"include/utils/*.h"
"include/physical/BufferPool/*.h"
"lib/MergedRequest.cpp"
)

include_directories(include)
Expand Down
169 changes: 150 additions & 19 deletions cpp/pixels-common/include/physical/BufferPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,180 @@
#ifndef DUCKDB_BUFFERPOOL_H
#define DUCKDB_BUFFERPOOL_H

#include <iostream>
#include <vector>
#include "exception/InvalidArgumentException.h"
#include "physical/BufferPool/Bitmap.h"
#include "physical/BufferPool/BufferPoolEntry.h"
#include "physical/natives/ByteBuffer.h"
#include <memory>
#include "physical/natives/DirectIoLib.h"
#include "exception/InvalidArgumentException.h"
#include "utils/ColumnSizeCSVReader.h"
#include <cstdio>
#include <iostream>
#include <map>

// when allocating buffer pool, we use the size of the first pxl file. Consider that
// the remaining pxl file has larger size than the first file, we allocate some extra
// size (10MB) to each column.
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
// when allocating buffer pool, we use the size of the first pxl file. Consider
// that the remaining pxl file has larger size than the first file, we allocate
// some extra size (10MB) to each column.
// TODO: how to evaluate the maximal pool size
#define EXTRA_POOL_SIZE 3*1024*1024
#define EXTRA_POOL_SIZE 10 * 1024 * 1024

class DirectUringRandomAccessFile;

// This class is global class. The variable is shared by each thread
class BufferPool
{
public:
static void
Initialize(std::vector <uint32_t> colIds, std::vector <uint64_t> bytes, std::vector <std::string> columnNames);
class BufferPoolManagedEntry
{
public:
enum class State
{
InitizaledNotAllocated,
AllocatedAndInUse,
UselessButNotFree
};

private:
std::shared_ptr<BufferPoolEntry> bufferPoolEntry;
int ring_index;
size_t current_size;
int offset;
State state;

public:
BufferPoolManagedEntry(std::shared_ptr<BufferPoolEntry> entry, int ringIdx,
size_t currSize, off_t off)
: bufferPoolEntry(std::move(entry)), ring_index(ringIdx),
current_size(currSize), offset(off),
state(State::InitizaledNotAllocated)
{
}

std::shared_ptr<BufferPoolEntry> getBufferPoolEntry() const
{
return bufferPoolEntry;
}

int getRingIndex() const
{
return ring_index;
}

void setRingIndex(int index)
{
ring_index = index;
}

size_t getCurrentSize() const
{
return current_size;
}

void setCurrentSize(size_t size)
{
current_size = size;
}

int getOffset() const
{
return offset;
}

static std::shared_ptr <ByteBuffer> GetBuffer(uint32_t colId);
void setOffset(int off)
{
offset = off;
}

static int64_t GetBufferId(uint32_t index);
State getStatus() const
{
return state;
}

void setStatus(State newStatus)
{
state = newStatus;
}
};

static void Initialize(std::vector<uint32_t> colIds,
std::vector<uint64_t> bytes,
std::vector<std::string> columnNames);

static void InitializeBuffers();

static std::shared_ptr<ByteBuffer> GetBuffer(uint32_t colId, uint64_t byte,
std::string columnName);

static int64_t GetBufferId();

static void Switch();

static void Reset();

static std::shared_ptr<BufferPoolEntry> AddNewBuffer(size_t size);

static int getRingIndex(uint32_t colId);

static std::shared_ptr<ByteBuffer> AllocateNewBuffer(
std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry,
uint32_t colId, uint64_t byte, std::string columnName);

static std::shared_ptr<ByteBuffer> ReusePreviousBuffer(
std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry,
uint32_t colId, uint64_t byte, std::string columnName);

static void PrintStats()
{
// Get the ID of the current thread
std::thread::id tid = std::this_thread::get_id();

// Print global buffer usage: used size / free size
// Convert thread ID to integer for readability using hash
printf("Thread %zu -> Global buffer usage: %ld / %ld\n",
std::hash<std::thread::id>{}(tid), global_used_size,
global_free_size);

// Print thread-local statistics for Buffer0
printf("Thread %zu -> Buffer0 usage: %zu, Buffer count: %d\n",
std::hash<std::thread::id>{}(tid), thread_local_used_size[0],
thread_local_buffer_count[0]);

// Print thread-local statistics for Buffer1
printf("Thread %zu -> Buffer1 usage: %zu, Buffer count: %d\n",
std::hash<std::thread::id>{}(tid), thread_local_used_size[1],
thread_local_buffer_count[1]);
}

private:
BufferPool() = default;
// global
static std::mutex bufferPoolMutex;

static thread_local int colCount;
static thread_local std::map<uint32_t, uint64_t>
nrBytes;
// thread local
static thread_local bool isInitialized;
static thread_local std::map<uint32_t, std::shared_ptr < ByteBuffer>>
buffers[2];
static std::shared_ptr <DirectIoLib> directIoLib;
static thread_local std::vector<std::shared_ptr<BufferPoolEntry>>
registeredBuffers[2];
static thread_local long global_used_size;
static thread_local long global_free_size;
static thread_local std::shared_ptr<DirectIoLib> directIoLib;
static thread_local int nextRingIndex;
static thread_local std::shared_ptr<BufferPoolEntry>
nextEmptyBufferPoolEntry[2];
static thread_local int colCount;

static thread_local int currBufferIdx;
static thread_local int nextBufferIdx;
static thread_local std::map<uint32_t, std::shared_ptr<ByteBuffer>>
buffersAllocated[2];
friend class DirectUringRandomAccessFile;

static thread_local std::unordered_map<
uint32_t, std::shared_ptr<BufferPoolManagedEntry>>
ringBufferMap[2];

static thread_local size_t thread_local_used_size[2];
static thread_local int thread_local_buffer_count[2];
};
#endif // DUCKDB_BUFFERPOOL_H
Loading