Skip to content

Commit

Permalink
[Enhancement] Optimize serialize_batch for nullable/const column (Sta…
Browse files Browse the repository at this point in the history
…rRocks#55374)

Signed-off-by: zihe.liu <ziheliu1024@gmail.com>
  • Loading branch information
ZiheLiu authored Feb 6, 2025
1 parent 6e8be51 commit 52691d3
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 18 deletions.
47 changes: 35 additions & 12 deletions be/src/column/binary_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,18 +538,6 @@ uint32_t BinaryColumnBase<T>::max_one_element_serialize_size() const {
return max_size + sizeof(uint32_t);
}

template <typename T>
uint32_t BinaryColumnBase<T>::serialize(size_t idx, uint8_t* pos) {
// max size of one string is 2^32, so use uint32_t not T
auto binary_size = static_cast<uint32_t>(_offsets[idx + 1] - _offsets[idx]);
T offset = _offsets[idx];

strings::memcpy_inlined(pos, &binary_size, sizeof(uint32_t));
strings::memcpy_inlined(pos + sizeof(uint32_t), &_bytes[offset], binary_size);

return sizeof(uint32_t) + binary_size;
}

template <typename T>
uint32_t BinaryColumnBase<T>::serialize_default(uint8_t* pos) {
// max size of one string is 2^32, so use uint32_t not T
Expand Down Expand Up @@ -590,6 +578,41 @@ void BinaryColumnBase<T>::deserialize_and_append_batch(Buffer<Slice>& srcs, size
}
}

template <typename T>
void BinaryColumnBase<T>::serialize_batch_with_null_masks(uint8_t* dst, Buffer<uint32_t>& slice_sizes,
size_t chunk_size, uint32_t max_one_row_size,
uint8_t* null_masks, bool has_null) {
uint32_t* sizes = slice_sizes.data();

if (!has_null) {
for (size_t i = 0; i < chunk_size; ++i) {
memcpy(dst + i * max_one_row_size + sizes[i], &has_null, sizeof(bool));
sizes[i] += static_cast<uint32_t>(sizeof(bool)) +
serialize(i, dst + i * max_one_row_size + sizes[i] + sizeof(bool));
}
} else {
for (size_t i = 0; i < chunk_size; ++i) {
memcpy(dst + i * max_one_row_size + sizes[i], null_masks + i, sizeof(bool));
sizes[i] += sizeof(bool);

if (!null_masks[i]) {
sizes[i] += serialize(i, dst + i * max_one_row_size + sizes[i]);
}
}
}
}

template <typename T>
void BinaryColumnBase<T>::deserialize_and_append_batch_nullable(Buffer<Slice>& srcs, size_t chunk_size,
Buffer<uint8_t>& is_nulls, bool& has_null) {
const uint32_t string_size = *((bool*)srcs[0].data) // is null
? 4
: *((uint32_t*)(srcs[0].data + sizeof(bool))); // first string size
_bytes.reserve(chunk_size * string_size * 2);
ColumnFactory<Column, BinaryColumnBase<T>>::deserialize_and_append_batch_nullable(srcs, chunk_size, is_nulls,
has_null);
}

template <typename T>
void BinaryColumnBase<T>::fnv_hash(uint32_t* hashes, uint32_t from, uint32_t to) const {
for (uint32_t i = from; i < to; ++i) {
Expand Down
18 changes: 17 additions & 1 deletion be/src/column/binary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "column/datum.h"
#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "gutil/strings/fastmem.h"
#include "util/slice.h"

namespace starrocks {
Expand Down Expand Up @@ -224,17 +225,32 @@ class BinaryColumnBase final : public ColumnFactory<Column, BinaryColumnBase<T>>

uint32_t max_one_element_serialize_size() const override;

uint32_t serialize(size_t idx, uint8_t* pos) override;
ALWAYS_INLINE uint32_t serialize(size_t idx, uint8_t* pos) override {
// max size of one string is 2^32, so use uint32_t not T
auto binary_size = static_cast<uint32_t>(_offsets[idx + 1] - _offsets[idx]);
T offset = _offsets[idx];

strings::memcpy_inlined(pos, &binary_size, sizeof(uint32_t));
strings::memcpy_inlined(pos + sizeof(uint32_t), &_bytes[offset], binary_size);

return sizeof(uint32_t) + binary_size;
}

uint32_t serialize_default(uint8_t* pos) override;

void serialize_batch(uint8_t* dst, Buffer<uint32_t>& slice_sizes, size_t chunk_size,
uint32_t max_one_row_size) override;

void serialize_batch_with_null_masks(uint8_t* dst, Buffer<uint32_t>& slice_sizes, size_t chunk_size,
uint32_t max_one_row_size, uint8_t* null_masks, bool has_null) override;

const uint8_t* deserialize_and_append(const uint8_t* pos) override;

void deserialize_and_append_batch(Buffer<Slice>& srcs, size_t chunk_size) override;

void deserialize_and_append_batch_nullable(Buffer<Slice>& srcs, size_t chunk_size, Buffer<uint8_t>& is_nulls,
bool& has_null) override;

uint32_t serialize_size(size_t idx) const override {
// max size of one string is 2^32, so use sizeof(uint32_t) not sizeof(T)
return static_cast<uint32_t>(sizeof(uint32_t) + _offsets[idx + 1] - _offsets[idx]);
Expand Down
22 changes: 22 additions & 0 deletions be/src/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/statusor.h"
#include "gutil/casts.h"
#include "storage/delete_condition.h" // for DelCondSatisfied
#include "util/slice.h"

namespace starrocks {

Expand Down Expand Up @@ -316,6 +317,9 @@ class Column {
virtual void serialize_batch_with_null_masks(uint8_t* dst, Buffer<uint32_t>& slice_sizes, size_t chunk_size,
uint32_t max_one_row_size, uint8_t* null_masks, bool has_null);

virtual void deserialize_and_append_batch_nullable(Buffer<Slice>& srcs, size_t chunk_size,
Buffer<uint8_t>& is_nulls, bool& has_null) = 0;

// deserialize one data and append to this column
virtual const uint8_t* deserialize_and_append(const uint8_t* pos) = 0;

Expand Down Expand Up @@ -497,6 +501,24 @@ class ColumnFactory : public Base {
Status accept_mutable(ColumnVisitorMutable* visitor) override {
return visitor->visit(static_cast<Derived*>(this));
}

void deserialize_and_append_batch_nullable(Buffer<Slice>& srcs, size_t chunk_size, Buffer<uint8_t>& is_nulls,
bool& has_null) override {
is_nulls.reserve(is_nulls.size() + chunk_size);
for (size_t i = 0; i < chunk_size; ++i) {
bool null;
memcpy(&null, srcs[i].data, sizeof(bool));
srcs[i].data += sizeof(bool);
is_nulls.emplace_back(null);

if (null == 0) {
srcs[i].data = (char*)mutable_derived()->deserialize_and_append((uint8_t*)srcs[i].data);
} else {
has_null = true;
mutable_derived()->append_default();
}
}
}
};

} // namespace starrocks
13 changes: 11 additions & 2 deletions be/src/column/const_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,17 @@ class ConstColumn final : public ColumnFactory<Column, ConstColumn> {

void serialize_batch(uint8_t* dst, Buffer<uint32_t>& slice_sizes, size_t chunk_size,
uint32_t max_one_row_size) override {
for (size_t i = 0; i < chunk_size; ++i) {
slice_sizes[i] += _data->serialize(0, dst + i * max_one_row_size + slice_sizes[i]);
if (chunk_size <= 0) {
return;
}

auto* first_row_buf = dst + slice_sizes[0];
const size_t first_row_bytes = _data->serialize(0, first_row_buf);
slice_sizes[0] += first_row_bytes;

for (size_t i = 1; i < chunk_size; ++i) {
strings::memcpy_inlined(dst + i * max_one_row_size + slice_sizes[i], first_row_buf, first_row_bytes);
slice_sizes[i] += first_row_bytes;
}
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/column/nullable_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,7 @@ const uint8_t* NullableColumn::deserialize_and_append(const uint8_t* pos) {
}

void NullableColumn::deserialize_and_append_batch(Buffer<Slice>& srcs, size_t chunk_size) {
for (size_t i = 0; i < chunk_size; ++i) {
srcs[i].data = (char*)deserialize_and_append((uint8_t*)srcs[i].data);
}
_data_column->deserialize_and_append_batch_nullable(srcs, chunk_size, null_column_data(), _has_null);
}

// Note: the hash function should be same with RawValue::get_hash_value_fvn
Expand Down

0 comments on commit 52691d3

Please sign in to comment.