Skip to content

Commit

Permalink
Add: new varchar storage. (infiniflow#1637)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Refactor: varchar storage
Fix bug: unit test of parallel buffer manager.

### Type of change

- [x] Refactoring
  • Loading branch information
small-turtle-1 authored Aug 13, 2024
1 parent c400cc7 commit 63d7967
Show file tree
Hide file tree
Showing 24 changed files with 395 additions and 251 deletions.
8 changes: 2 additions & 6 deletions src/embedded_infinity/wrap_infinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,13 +883,9 @@ void HandleVarcharType(ColumnField &output_column_field, SizeT row_count, const
std::memcpy(dst.data() + current_offset, &length, sizeof(i32));
std::memcpy(dst.data() + current_offset + sizeof(i32), varchar.short_.data_, varchar.length_);
} else {
auto varchar_ptr = MakeUnique<char[]>(varchar.length_ + 1);
column_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.get(),
varchar.vector_.chunk_id_,
varchar.vector_.chunk_offset_,
varchar.length_);
const char *data = column_vector->buffer_->var_buffer_mgr_->Get(varchar.vector_.file_offset_, varchar.length_);
std::memcpy(dst.data() + current_offset, &length, sizeof(i32));
std::memcpy(dst.data() + current_offset + sizeof(i32), varchar_ptr.get(), varchar.length_);
std::memcpy(dst.data() + current_offset + sizeof(i32), data, varchar.length_);
}
current_offset += sizeof(i32) + varchar.length_;
}
Expand Down
9 changes: 4 additions & 5 deletions src/function/cast/embedding_cast.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,16 @@ inline bool EmbeddingTryCastToVarlen::Run(const EmbeddingT &source,
// inline varchar
std::memcpy(target.short_.data_, res.c_str(), target.length_);
} else {
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kHeap) {
String error_message = fmt::format("Varchar column vector should use MemoryVectorBuffer.");
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kVarBuffer) {
String error_message = fmt::format("Varchar column vector should use VarBuffer.");
UnrecoverableError(error_message);
}

// Set varchar prefix
std::memcpy(target.vector_.prefix_, res.c_str(), VARCHAR_PREFIX_LEN);

auto [chunk_id, chunk_offset] = vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(res.c_str(), target.length_);
target.vector_.chunk_id_ = chunk_id;
target.vector_.chunk_offset_ = chunk_offset;
SizeT offset = vector_ptr->buffer_->var_buffer_mgr_->Append(res.c_str(), target.length_);
target.vector_.file_offset_ = offset;
}

return true;
Expand Down
17 changes: 7 additions & 10 deletions src/function/cast/float_cast.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,12 @@ inline bool FloatTryCastToVarlen::Run(FloatT source, VarcharT &target, ColumnVec
std::memcpy(target.short_.data_, tmp_str.c_str(), target.length_);
} else {
std::memcpy(target.vector_.prefix_, tmp_str.c_str(), VARCHAR_PREFIX_LEN);
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kHeap) {
String error_message = "Varchar column vector should use MemoryVectorBuffer.";
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kVarBuffer) {
String error_message = "Varchar column vector should use VarBuffer.";
UnrecoverableError(error_message);
}
auto [chunk_id, chunk_offset] = vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(tmp_str.c_str(), target.length_);
target.vector_.chunk_id_ = chunk_id;
target.vector_.chunk_offset_ = chunk_offset;
SizeT offset = vector_ptr->buffer_->var_buffer_mgr_->Append(tmp_str.c_str(), target.length_);
target.vector_.file_offset_ = offset;
}

return true;
Expand Down Expand Up @@ -307,16 +306,14 @@ inline bool FloatTryCastToVarlen::Run(DoubleT source, VarcharT &target, ColumnVe
std::memcpy(target.short_.data_, tmp_str.c_str(), target.length_);
} else {
std::memcpy(target.vector_.prefix_, tmp_str.c_str(), VARCHAR_PREFIX_LEN);
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kHeap) {
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kVarBuffer) {
String error_message = "Varchar column vector should use MemoryVectorBuffer. ";
UnrecoverableError(error_message);
}
auto [chunk_id, chunk_offset] = vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(tmp_str.c_str(), target.length_);
target.vector_.chunk_id_ = chunk_id;
target.vector_.chunk_offset_ = chunk_offset;
SizeT offset = vector_ptr->buffer_->var_buffer_mgr_->Append(tmp_str.c_str(), target.length_);
target.vector_.file_offset_ = offset;
}


return true;
}

Expand Down
9 changes: 4 additions & 5 deletions src/function/cast/integer_cast.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -543,13 +543,12 @@ inline bool IntegerTryCastToVarlen::Run(BigIntT source, VarcharT &target, Column
std::memcpy(target.short_.data_, tmp_str.c_str(), target.length_);
} else {
std::memcpy(target.vector_.prefix_, tmp_str.c_str(), VARCHAR_PREFIX_LEN);
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kHeap) {
String error_message = "Varchar column vector should use MemoryVectorBuffer.";
if (vector_ptr->buffer_->buffer_type_ != VectorBufferType::kVarBuffer) {
String error_message = "Varchar column vector should use VarBuffer.";
UnrecoverableError(error_message);
}
auto [chunk_id, chunk_offset] = vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(tmp_str.c_str(), target.length_);
target.vector_.chunk_id_ = chunk_id;
target.vector_.chunk_offset_ = chunk_offset;
SizeT offset = vector_ptr->buffer_->var_buffer_mgr_->Append(tmp_str.c_str(), target.length_);
target.vector_.file_offset_ = offset;
}

return true;
Expand Down
32 changes: 10 additions & 22 deletions src/function/cast/varchar_cast.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,9 @@ inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* sour
{
// varchar is vector
SizeT varchar_len = source.length_;
u32 chunk_id = source.vector_.chunk_id_;
u32 chunk_offset = source.vector_.chunk_offset_;
auto varchar_ptr = MakeUniqueForOverwrite<char[]>(varchar_len + 1);
varchar_ptr[varchar_len] = '\0';
source_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.get(), chunk_id, chunk_offset, varchar_len);
const char *data = source_vector->buffer_->var_buffer_mgr_->Get(source.vector_.file_offset_, varchar_len);

auto [ptr, ec] = std::from_chars(varchar_ptr.get(), varchar_ptr.get() + varchar_len, target);
auto [ptr, ec] = std::from_chars(data, data + varchar_len, target);
if (ec != std::errc()) {
return false;
}
Expand Down Expand Up @@ -344,11 +340,9 @@ inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* sour
{
// varchar is vector
SizeT varchar_len = source.length_;
u32 chunk_id = source.vector_.chunk_id_;
u32 chunk_offset = source.vector_.chunk_offset_;

String varchar_ptr(varchar_len, 0);
source_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.data(), chunk_id, chunk_offset, varchar_len);
const char *data = source_vector->buffer_->var_buffer_mgr_->Get(source.vector_.file_offset_, varchar_len);
String varchar_ptr(data, varchar_len);

// Used in libc++
try {
Expand Down Expand Up @@ -387,11 +381,9 @@ inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* sour
{
// varchar is vector
SizeT varchar_len = source.length_;
u32 chunk_id = source.vector_.chunk_id_;
u32 chunk_offset = source.vector_.chunk_offset_;
const char *data = source_vector->buffer_->var_buffer_mgr_->Get(source.vector_.file_offset_, varchar_len);

String varchar_ptr(varchar_len, 0);
source_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.data(), chunk_id, chunk_offset, varchar_len);
String varchar_ptr(data, varchar_len);

// Used in libc++
try {
Expand Down Expand Up @@ -431,11 +423,9 @@ inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* sour
{
// varchar is vector
SizeT varchar_len = source.length_;
u32 chunk_id = source.vector_.chunk_id_;
u32 chunk_offset = source.vector_.chunk_offset_;
const char *data = source_vector->buffer_->var_buffer_mgr_->Get(source.vector_.file_offset_, varchar_len);

String varchar_ptr(varchar_len, 0);
source_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.data(), chunk_id, chunk_offset, varchar_len);
String varchar_ptr(data, varchar_len);

// Used in libc++
try {
Expand Down Expand Up @@ -474,11 +464,9 @@ inline bool TryCastVarcharVector::Run(const VarcharT &source, ColumnVector* sour
{
// varchar is vector
SizeT varchar_len = source.length_;
u32 chunk_id = source.vector_.chunk_id_;
u32 chunk_offset = source.vector_.chunk_offset_;
const char *data = source_vector->buffer_->var_buffer_mgr_->Get(source.vector_.file_offset_, varchar_len);

String varchar_ptr(varchar_len, 0);
source_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.data(), chunk_id, chunk_offset, varchar_len);
String varchar_ptr(data, varchar_len);

// Used in libc++
try {
Expand Down
8 changes: 2 additions & 6 deletions src/network/infinity_thrift_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2350,13 +2350,9 @@ void InfinityThriftService::HandleVarcharType(infinity_thrift_rpc::ColumnField &
std::memcpy(dst.data() + current_offset, &length, sizeof(i32));
std::memcpy(dst.data() + current_offset + sizeof(i32), varchar.short_.data_, varchar.length_);
} else {
auto varchar_ptr = MakeUnique<char[]>(varchar.length_ + 1);
column_vector->buffer_->fix_heap_mgr_->ReadFromHeap(varchar_ptr.get(),
varchar.vector_.chunk_id_,
varchar.vector_.chunk_offset_,
varchar.length_);
const auto *data = column_vector->buffer_->var_buffer_mgr_->Get(varchar.vector_.file_offset_, length);
std::memcpy(dst.data() + current_offset, &length, sizeof(i32));
std::memcpy(dst.data() + current_offset + sizeof(i32), varchar_ptr.get(), varchar.length_);
std::memcpy(dst.data() + current_offset + sizeof(i32), data, varchar.length_);
}
current_offset += sizeof(i32) + varchar.length_;
}
Expand Down
3 changes: 1 addition & 2 deletions src/parser/type/complex/varchar.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ struct InlineVarchar {

struct VectorVarchar {
char prefix_[VARCHAR_PREFIX_LENGTH]{};
uint32_t chunk_id_{0};
uint32_t chunk_offset_{0};
uint64_t file_offset_{0};
};

struct Varchar {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/buffer/buffer_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module;
import stl;
import buffer_handle;
import buffer_obj;
import logger;
import file_worker_type;

module buffer_handle;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ bool BufferManager::RequestSpace(SizeT need_size) {
round_robin_ = (round_robin_ + 1) % lru_caches_.size();
} while (freed_space + free_space < need_size && round_robin_ != round_robin);
bool free_success = freed_space + free_space >= need_size;
[[maybe_unused]] auto cur_mem_size = current_memory_size_.fetch_add(need_size - freed_space);
[[maybe_unused]] auto cur_mem_size = current_memory_size_.fetch_add(need_size - freed_space); // It's ok to add minus value
return free_success;
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ BufferHandle BufferObj::Load() {
bool BufferObj::Free() {
std::unique_lock<std::mutex> locker(w_locker_, std::defer_lock);
if (!locker.try_lock()) {
return false;
return false; // when other thread is loading or cleaning, return false
}
if (status_ != BufferStatus::kUnloaded) {
String error_message = fmt::format("attempt to free {} buffer object", BufferStatusToString(status_));
Expand Down
7 changes: 5 additions & 2 deletions src/storage/buffer/file_worker/var_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ void VarFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
UnrecoverableError(error_message);
}
const auto *buffer = static_cast<const VarBuffer *>(data_);
auto [data_size, data] = buffer->Finish();
SizeT data_size = buffer->TotalSize();
auto buffer_data = MakeUnique<char[]>(data_size);
char *ptr = buffer_data.get();
buffer->Write(ptr);

LocalFileSystem fs;
u64 nbytes = fs.Write(*file_handler_, data.get(), data_size);
u64 nbytes = fs.Write(*file_handler_, buffer_data.get(), data_size);
if (nbytes != data_size) {
String error_message = fmt::format("Write {} bytes to file failed, only {} bytes written.", data_size, nbytes);
UnrecoverableError(error_message);
Expand Down
Loading

0 comments on commit 63d7967

Please sign in to comment.