Skip to content

Commit

Permalink
[enhance](column) add has_enough_capacity function interface for colu…
Browse files Browse the repository at this point in the history
…mn and fix analytic operator string overflow (apache#47364)

### What problem does this PR solve?
Problem Summary:

1. add has_enough_capacity function interface for column
2. change the sorter conditions for sort data, when need_bytes >
capacity_bytes - used_bytes,
    means the left bytes is not enough so need call do_sort().
3. fix column string overflow in analytic operator, the fixed must in
this PR, as the test case without it will failed
  • Loading branch information
zhangstar333 authored Feb 8, 2025
1 parent 0b49470 commit cc8f61c
Show file tree
Hide file tree
Showing 26 changed files with 160 additions and 26 deletions.
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
}

void AnalyticSinkLocalState::_remove_unused_rows() {
// test column overflow 4G
DBUG_EXECUTE_IF("AnalyticSinkLocalState._remove_unused_rows", { return; });
const size_t block_num = 256;
if (_removed_block_index + block_num + 1 >= _input_block_first_row_positions.size()) {
return;
Expand Down Expand Up @@ -845,7 +847,9 @@ Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
RETURN_IF_ERROR(expr->execute(block, &result_col_id));
DCHECK_GE(result_col_id, 0);
auto column = block->get_by_position(result_col_id).column->convert_to_full_column_if_const();
dst_column->insert_range_from(*column, 0, length);
// iff dst_column is string, maybe overflow of 4G, so need ignore overflow
// the column is used by compare_at self to find the range, it's need convert it when overflow?
dst_column->insert_range_from_ignore_overflow(*column, 0, length);
return Status::OK();
}

Expand Down
11 changes: 11 additions & 0 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,17 @@ class IColumn : public COW<IColumn> {
/// Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined.
virtual size_t byte_size() const = 0;

/**
* @brief Checks whether the current column has enough capacity to accommodate the given source column.
*
* This pure virtual function should be implemented by derived classes to determine whether the
* current column has enough reserved memory to hold the data of the specified `src` column.
*
* @param src The source column whose data needs to be checked for fitting into the current column.
* @return true if the current column has enough capacity to hold the `src` data, false otherwise.
*/
virtual bool has_enough_capacity(const IColumn& src) const = 0;

/// Size of memory, allocated for column.
/// This is greater or equals to byte_size due to memory reservation in containers.
/// Zero, if could not be determined.
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ size_t ColumnArray::allocated_bytes() const {
return get_data().allocated_bytes() + get_offsets().allocated_bytes();
}

bool ColumnArray::has_enough_capacity(const IColumn& src) const {
const auto& src_concrete = assert_cast<const ColumnArray&>(src);
return get_data().has_enough_capacity(src_concrete.get_data()) &&
get_offsets_column().has_enough_capacity(src_concrete.get_offsets_column());
}

ColumnPtr ColumnArray::convert_to_full_column_if_const() const {
/// It is possible to have an array with constant data and non-constant offsets.
/// Example is the result of expression: replicate('hello', [1])
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
void reserve(size_t n) override;
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;
void insert_many_from(const IColumn& src, size_t position, size_t length) override;

Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/columns/column_complex.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class ColumnComplexType final : public COWHelper<IColumn, ColumnComplexType<T>>

size_t allocated_bytes() const override { return byte_size(); }

bool has_enough_capacity(const IColumn& src) const override {
const Self& src_vec = assert_cast<const Self&>(src);
return data.capacity() - data.size() > src_vec.size();
}

void insert_value(T value) { data.emplace_back(std::move(value)); }

void reserve(size_t n) override { data.reserve(n); }
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {

size_t allocated_bytes() const override { return data->allocated_bytes() + sizeof(s); }

bool has_enough_capacity(const IColumn& src) const override { return true; }

int compare_at(size_t, size_t, const IColumn& rhs, int nan_direction_hint) const override {
auto rhs_const_column = assert_cast<const ColumnConst&, TypeCheckOnRelease::DISABLE>(rhs);

Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/columns/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class ColumnDecimal final : public COWHelper<IColumn, ColumnDecimal<T>> {
size_t size() const override { return data.size(); }
size_t byte_size() const override { return data.size() * sizeof(data[0]); }
size_t allocated_bytes() const override { return data.allocated_bytes(); }
bool has_enough_capacity(const IColumn& src) const override {
const ColumnDecimal& src_vec = assert_cast<const ColumnDecimal&>(src);
return data.capacity() - data.size() > src_vec.size();
}
void reserve(size_t n) override { data.reserve(n); }
void resize(size_t n) override { data.resize(n); }

Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_dictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ class ColumnDictionary final : public COWHelper<IColumn, ColumnDictionary<T>> {

size_t allocated_bytes() const override { return byte_size(); }

bool has_enough_capacity(const IColumn& src) const override {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"has_enough_capacity not supported in ColumnDictionary");
__builtin_unreachable();
}

void pop_back(size_t n) override {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"pop_back not supported in ColumnDictionary");
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_dummy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IColumnDummy : public IColumn {
void pop_back(size_t n) override { s -= n; }
size_t byte_size() const override { return 0; }
size_t allocated_bytes() const override { return 0; }
bool has_enough_capacity(const IColumn& src) const override { return false; }
int compare_at(size_t, size_t, const IColumn&, int) const override { return 0; }

[[noreturn]] Field operator[](size_t) const override {
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/columns/column_fixed_length_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ class ColumnFixedLengthObject final : public COWHelper<IColumn, ColumnFixedLengt

size_t allocated_bytes() const override { return _data.allocated_bytes(); }

bool has_enough_capacity(const IColumn& src) const override {
const auto& src_col = assert_cast<const ColumnFixedLengthObject&>(src);
return _data.capacity() - _data.size() > src_col.size();
}

//NOTICE: here is replace: this[self_row] = rhs[row]
//But column string is replaced all when self_row = 0
void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,13 @@ size_t ColumnMap::allocated_bytes() const {
get_offsets().allocated_bytes();
}

bool ColumnMap::has_enough_capacity(const IColumn& src) const {
const auto& src_concrete = assert_cast<const ColumnMap&>(src);
return keys_column->has_enough_capacity(*src_concrete.keys_column) &&
values_column->has_enough_capacity(*src_concrete.values_column) &&
offsets_column->has_enough_capacity(*src_concrete.offsets_column);
}

ColumnPtr ColumnMap::convert_to_full_column_if_const() const {
return ColumnMap::create(keys_column->convert_to_full_column_if_const(),
values_column->convert_to_full_column_if_const(),
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
void resize(size_t n) override;
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;

void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ size_t ColumnNullable::allocated_bytes() const {
return get_nested_column().allocated_bytes() + get_null_map_column().allocated_bytes();
}

bool ColumnNullable::has_enough_capacity(const IColumn& src) const {
const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
return get_nested_column().has_enough_capacity(src_concrete.get_nested_column()) &&
get_null_map_column().has_enough_capacity(src_concrete.get_null_map_column());
}

ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const {
ColumnPtr replicated_data = get_nested_column().replicate(offsets);
ColumnPtr replicated_null_map = get_null_map_column().replicate(offsets);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>, public N
void resize(size_t n) override;
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

size_t allocatedBytes() const;

bool has_enough_capacity(const IColumn& src) const { return false; };

bool is_finalized() const;

const DataTypePtr& get_least_common_type() const { return least_common_type.get(); }
Expand Down Expand Up @@ -386,6 +388,8 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

size_t allocated_bytes() const override;

bool has_enough_capacity(const IColumn& src) const override { return false; }

void for_each_subcolumn(ColumnCallback callback) override;

// Do nothing, call try_insert instead
Expand Down
19 changes: 14 additions & 5 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ void ColumnStr<T>::insert_range_from_ignore_overflow(const doris::vectorized::IC
}
}

template <typename T>
bool ColumnStr<T>::has_enough_capacity(const IColumn& src) const {
const auto& src_concrete = assert_cast<const ColumnStr<T>&>(src);
return (this->get_chars().capacity() - this->get_chars().size() >
src_concrete.get_chars().size()) &&
(this->get_offsets().capacity() - this->get_offsets().size() >
src_concrete.get_offsets().size());
}

template <typename T>
void ColumnStr<T>::insert_range_from(const IColumn& src, size_t start, size_t length) {
if (length == 0) {
Expand All @@ -166,7 +175,7 @@ void ColumnStr<T>::insert_range_from(const IColumn& src, size_t start, size_t le
auto nested_length = src_offsets[start + length - 1] - nested_offset;

size_t old_chars_size = chars.size();
check_chars_length(old_chars_size + nested_length, offsets.size() + length);
check_chars_length(old_chars_size + nested_length, offsets.size() + length, size());
chars.resize(old_chars_size + nested_length);
memcpy(&chars[old_chars_size], &src_chars[nested_offset], nested_length);

Expand Down Expand Up @@ -200,7 +209,7 @@ void ColumnStr<T>::insert_many_from(const IColumn& src, size_t position, size_t
auto [data_val, data_length] = string_column.get_data_at(position);

size_t old_chars_size = chars.size();
check_chars_length(old_chars_size + data_length * length, offsets.size() + length);
check_chars_length(old_chars_size + data_length * length, offsets.size() + length, size());
chars.resize(old_chars_size + data_length * length);

auto old_size = offsets.size();
Expand Down Expand Up @@ -235,7 +244,7 @@ void ColumnStr<T>::insert_indices_from(const IColumn& src, const uint32_t* indic
// if Offsets is uint32, size will not exceed range of uint32, cast is OK.
dst_offsets_data[dst_offsets_pos++] = static_cast<T>(total_chars_size);
}
check_chars_length(total_chars_size, offsets.size());
check_chars_length(total_chars_size, offsets.size(), dst_offsets_pos);

chars.resize(total_chars_size);

Expand Down Expand Up @@ -418,7 +427,7 @@ const char* ColumnStr<T>::deserialize_and_insert_from_arena(const char* pos) {

const size_t old_size = chars.size();
const size_t new_size = old_size + string_size;
check_chars_length(new_size, offsets.size() + 1);
check_chars_length(new_size, offsets.size() + 1, size());
chars.resize(new_size);
memcpy(chars.data() + old_size, pos, string_size);

Expand Down Expand Up @@ -582,7 +591,7 @@ ColumnPtr ColumnStr<T>::replicate(const IColumn::Offsets& replicate_offsets) con
prev_string_offset = offsets[i];
}

check_chars_length(res_chars.size(), res_offsets.size());
check_chars_length(res_chars.size(), res_offsets.size(), col_size);
return res;
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {

static constexpr size_t MAX_STRINGS_OVERFLOW_SIZE = 128;

void static check_chars_length(size_t total_length, size_t element_number) {
void static check_chars_length(size_t total_length, size_t element_number, size_t rows = 0) {
if (UNLIKELY(total_length > MAX_STRING_SIZE)) {
throw Exception(
ErrorCode::STRING_OVERFLOW_IN_VEC_ENGINE,
"string column length is too large: total_length={}, element_number={}, "
"you can set batch_size a number smaller than {} to avoid this error",
total_length, element_number, element_number);
"you can set batch_size a number smaller than {} to avoid this error. rows:{}",
total_length, element_number, element_number, rows);
}
}

Expand Down Expand Up @@ -117,6 +117,8 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {

size_t byte_size() const override { return chars.size() + offsets.size() * sizeof(offsets[0]); }

bool has_enough_capacity(const IColumn& src) const override;

size_t allocated_bytes() const override {
return chars.allocated_bytes() + offsets.allocated_bytes();
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/columns/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,16 @@ size_t ColumnStruct::allocated_bytes() const {
return res;
}

bool ColumnStruct::has_enough_capacity(const IColumn& src) const {
const auto& src_concrete = assert_cast<const ColumnStruct&>(src);
for (size_t i = 0; i < columns.size(); ++i) {
if (!columns[i]->has_enough_capacity(*src_concrete.columns[i])) {
return false;
}
}
return true;
}

void ColumnStruct::for_each_subcolumn(ColumnCallback callback) {
for (auto& column : columns) {
callback(column);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
void resize(size_t n) override;
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;
void for_each_subcolumn(ColumnCallback callback) override;
bool structure_equals(const IColumn& rhs) const override;

Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/columns/column_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ class ColumnVector final : public COWHelper<IColumn, ColumnVector<T>> {

size_t allocated_bytes() const override { return data.allocated_bytes(); }

bool has_enough_capacity(const IColumn& src) const override {
const auto& src_vec = assert_cast<const ColumnVector&>(src);
return data.capacity() - data.size() > src_vec.data.size();
}

void insert_value(const T value) { data.push_back(value); }

/// This method implemented in header because it could be possibly devirtualized.
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/columns/predicate_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ class PredicateColumnType final : public COWHelper<IColumn, PredicateColumnType<

size_t allocated_bytes() const override { return byte_size(); }

bool has_enough_capacity(const IColumn& src) const override {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"has_enough_capacity not supported in PredicateColumnType");
}

void reserve(size_t n) override { data.reserve(n); }

std::string get_name() const override { return TypeName<T>::get(); }
Expand Down
18 changes: 16 additions & 2 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/object_pool.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -210,11 +211,23 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first),
_state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)) {}

// check whether the unsorted block can hold more data from input block and no need to alloc new memory
bool FullSorter::has_enough_capacity(Block* input_block, Block* unsorted_block) const {
DCHECK_EQ(input_block->columns(), unsorted_block->columns());
for (auto i = 0; i < input_block->columns(); ++i) {
if (!unsorted_block->get_by_position(i).column->has_enough_capacity(
*input_block->get_by_position(i).column)) {
return false;
}
}
return true;
}

Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);

if (_reach_limit() && block->bytes() > _state->unsorted_block()->allocated_bytes() -
_state->unsorted_block()->bytes()) {
// iff have reach limit and the unsorted block capacity can't hold the block data size
if (_reach_limit() && !has_enough_capacity(block, _state->unsorted_block().get())) {
RETURN_IF_ERROR(_do_sort());
}

Expand Down Expand Up @@ -260,6 +273,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState* state, doris::vectori
Status FullSorter::_do_sort() {
Block* src_block = _state->unsorted_block().get();
Block desc_block = src_block->clone_without_columns();
COUNTER_UPDATE(_partial_sort_counter, 1);
RETURN_IF_ERROR(partial_sort(*src_block, desc_block));

// dispose TOP-N logic
Expand Down
Loading

0 comments on commit cc8f61c

Please sign in to comment.