Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix array_map crash #55383

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/column/adaptive_nullable_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ class AdaptiveNullableColumn final : public ColumnFactory<NullableColumn, Adapti
}
}

ColumnPtr replicate(const Buffer<uint32_t>& offsets) override {
StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) override {
materialized_nullable();
return NullableColumn::replicate(offsets);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/column/array_view_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace starrocks {

ColumnPtr ArrayViewColumn::replicate(const Buffer<uint32_t>& offsets) {
StatusOr<ColumnPtr> ArrayViewColumn::replicate(const Buffer<uint32_t>& offsets) {
auto dest_size = offsets.size() - 1;
auto new_offsets = UInt32Column::create();
auto new_lengths = UInt32Column::create();
Expand Down
2 changes: 1 addition & 1 deletion be/src/column/array_view_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class ArrayViewColumn final : public ColumnFactory<Column, ArrayViewColumn> {

void put_mysql_row_buffer(MysqlRowBuffer* buf, size_t idx, bool is_binary_protocol = false) const override;

ColumnPtr replicate(const Buffer<uint32_t>& offsets) override;
StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) override;

std::string get_name() const override { return "array-view-" + _elements->get_name(); }

Expand Down
10 changes: 9 additions & 1 deletion be/src/column/binary_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void BinaryColumnBase<T>::append_value_multiple_times(const Column& src, uint32_

//TODO(fzh): optimize copy using SIMD
template <typename T>
ColumnPtr BinaryColumnBase<T>::replicate(const Buffer<uint32_t>& offsets) {
StatusOr<ColumnPtr> BinaryColumnBase<T>::replicate(const Buffer<uint32_t>& offsets) {
auto dest = std::dynamic_pointer_cast<BinaryColumnBase<T>>(BinaryColumnBase<T>::create());
auto& dest_offsets = dest->get_offset();
auto& dest_bytes = dest->get_bytes();
Expand All @@ -151,6 +151,14 @@ ColumnPtr BinaryColumnBase<T>::replicate(const Buffer<uint32_t>& offsets) {
dest_offsets[j + 1] = pos;
}
}

auto ret = dest->upgrade_if_overflow();
if (!ret.ok()) {
return ret.status();
} else if (ret.value() != nullptr) {
return ret.value();
}

return dest;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/column/binary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class BinaryColumnBase final : public ColumnFactory<Column, BinaryColumnBase<T>>
_slices_cache = false;
}

ColumnPtr replicate(const Buffer<uint32_t>& offsets) override;
StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) override;

void fill_default(const Filter& filter) override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class Column {
// for example: column(1,2)->replicate({0,2,5}) = column(1,1,2,2,2)
// FixedLengthColumn, BinaryColumn and ConstColumn override this function for better performance.
// TODO(fzh): optimize replicate() for ArrayColumn, ObjectColumn and others.
virtual ColumnPtr replicate(const Buffer<uint32_t>& offsets) {
virtual StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) {
auto dest = this->clone_empty();
auto dest_size = offsets.size() - 1;
DCHECK(this->size() >= dest_size) << "The size of the source column is less when duplicating it.";
Expand Down
2 changes: 1 addition & 1 deletion be/src/column/const_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void ConstColumn::append_value_multiple_times(const Column& src, uint32_t index,
append(src, index, size);
}

ColumnPtr ConstColumn::replicate(const Buffer<uint32_t>& offsets) {
StatusOr<ColumnPtr> ConstColumn::replicate(const Buffer<uint32_t>& offsets) {
return ConstColumn::create(this->_data->clone_shared(), offsets.back());
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/column/const_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class ConstColumn final : public ColumnFactory<Column, ConstColumn> {

void append_value_multiple_times(const Column& src, uint32_t index, uint32_t size) override;

ColumnPtr replicate(const Buffer<uint32_t>& offsets) override;
StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) override;

bool append_nulls(size_t count) override {
DCHECK_GT(count, 0);
Expand Down
2 changes: 1 addition & 1 deletion be/src/column/fixed_length_column_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void FixedLengthColumnBase<T>::append_value_multiple_times(const Column& src, ui

//TODO(fzh): optimize copy using SIMD
template <typename T>
ColumnPtr FixedLengthColumnBase<T>::replicate(const Buffer<uint32_t>& offsets) {
StatusOr<ColumnPtr> FixedLengthColumnBase<T>::replicate(const Buffer<uint32_t>& offsets) {
auto dest = this->clone_empty();
auto& dest_data = down_cast<FixedLengthColumnBase<T>&>(*dest);
dest_data._data.resize(offsets.back());
Expand Down
2 changes: 1 addition & 1 deletion be/src/column/fixed_length_column_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class FixedLengthColumnBase : public ColumnFactory<Column, FixedLengthColumnBase
_data.resize(_data.size() + count, DefaultValueGenerator<ValueType>::next_value());
}

ColumnPtr replicate(const Buffer<uint32_t>& offsets) override;
StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) override;

void fill_default(const Filter& filter) override;

Expand Down
9 changes: 6 additions & 3 deletions be/src/column/nullable_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,12 @@ void NullableColumn::append_value_multiple_times(const Column& src, uint32_t ind
DCHECK_EQ(_null_column->size(), _data_column->size());
}

ColumnPtr NullableColumn::replicate(const Buffer<uint32_t>& offsets) {
return NullableColumn::create(this->_data_column->replicate(offsets),
std::dynamic_pointer_cast<NullColumn>(this->_null_column->replicate(offsets)));
StatusOr<ColumnPtr> NullableColumn::replicate(const Buffer<uint32_t>& offsets) {
ASSIGN_OR_RETURN(auto data_col, this->_data_column->replicate(offsets));

ASSIGN_OR_RETURN(auto null_col, this->_null_column->replicate(offsets));

return NullableColumn::create(data_col, std::dynamic_pointer_cast<NullColumn>(null_col));
}

bool NullableColumn::append_nulls(size_t count) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/column/nullable_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class NullableColumn : public ColumnFactory<Column, NullableColumn> {
_has_null = true;
return true;
}
ColumnPtr replicate(const Buffer<uint32_t>& offsets) override;
StatusOr<ColumnPtr> replicate(const Buffer<uint32_t>& offsets) override;

size_t memory_usage() const override {
return _data_column->memory_usage() + _null_column->memory_usage() + sizeof(bool);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/stream/aggregate/agg_group_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ Status AggGroupState::output_changes(size_t chunk_size, const Columns& group_by_
auto detail_result_chunk = std::make_shared<Chunk>();
SlotId slot_id = 0;
for (size_t j = 0; j < group_by_columns.size(); j++) {
auto replicated_col = group_by_columns[j]->replicate(replicate_offsets);
ASSIGN_OR_RETURN(auto replicated_col, group_by_columns[j]->replicate(replicate_offsets))
detail_result_chunk->append_column(replicated_col, slot_id++);
}
// TODO: take care slot_ids.
Expand Down
8 changes: 5 additions & 3 deletions be/src/exprs/array_map_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ StatusOr<ColumnPtr> ArrayMapExpr::evaluate_lambda_expr(ExprContext* context, Chu
} else {
if (captured_column->is_array()) {
auto view_column = ArrayViewColumn::from_array_column(captured_column);
cur_chunk->append_column(view_column->replicate(aligned_offsets->get_data()), slot_id);
ASSIGN_OR_RETURN(auto replicated_view_column, view_column->replicate(aligned_offsets->get_data()));
cur_chunk->append_column(replicated_view_column, slot_id);
} else {
cur_chunk->append_column(captured_column->replicate(aligned_offsets->get_data()), slot_id);
ASSIGN_OR_RETURN(auto replicated_column, captured_column->replicate(aligned_offsets->get_data()));
cur_chunk->append_column(replicated_column, slot_id);
}
}
}
Expand All @@ -188,7 +190,7 @@ StatusOr<ColumnPtr> ArrayMapExpr::evaluate_lambda_expr(ExprContext* context, Chu
ASSIGN_OR_RETURN(tmp_col, context->evaluate(_children[0], cur_chunk.get()));
}
tmp_col->check_or_die();
column = tmp_col->replicate(aligned_offsets->get_data());
ASSIGN_OR_RETURN(column, tmp_col->replicate(aligned_offsets->get_data()));
column = ColumnHelper::align_return_type(column, type().children[0], column->size(), true);
} else {
// if all input arguments are constant and lambda expr doesn't rely on other capture columns,
Expand Down
4 changes: 3 additions & 1 deletion be/src/exprs/map_apply_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ StatusOr<ColumnPtr> MapApplyExpr::evaluate_checked(ExprContext* context, Chunk*
return Status::InternalError(fmt::format("The size of the captured column {} is less than map's size.",
captured->get_name()));
}
cur_chunk->append_column(captured->replicate(input_map->offsets_column()->get_data()), id);

ASSIGN_OR_RETURN(auto replicated_col, captured->replicate(input_map->offsets_column()->get_data()));
cur_chunk->append_column(replicated_col, id);
}
// evaluate the lambda expression
if (cur_chunk->num_rows() <= chunk->num_rows() * 8) {
Expand Down
2 changes: 1 addition & 1 deletion be/test/column/array_column_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ PARALLEL_TEST(ArrayColumnTest, test_replicate) {
off.push_back(5);
off.push_back(7);

auto res = column->replicate(off);
auto res = column->replicate(off).value();

ASSERT_EQ("[1,2,3]", res->debug_item(0));
ASSERT_EQ("[1,2,3]", res->debug_item(1));
Expand Down
2 changes: 1 addition & 1 deletion be/test/column/array_view_column_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ PARALLEL_TEST(ArrayViewColumnTest, test_other_manipulations) {
auto array_view_column =
std::dynamic_pointer_cast<ArrayViewColumn>(ArrayViewColumn::from_array_column(array_column));
Buffer<uint32_t> offsets{0, 2, 3, 6};
auto column = array_view_column->replicate(offsets);
auto column = array_view_column->replicate(offsets).value();
ASSERT_TRUE(column->is_array_view());
auto result = std::dynamic_pointer_cast<ArrayViewColumn>(column);
ASSERT_EQ(result->size(), 6);
Expand Down
2 changes: 1 addition & 1 deletion be/test/column/binary_column_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ PARALLEL_TEST(BinaryColumnTest, test_replicate) {
offsets.push_back(3);
offsets.push_back(5);

auto c2 = c1->replicate(offsets);
auto c2 = c1->replicate(offsets).value();

auto slices = down_cast<BinaryColumn*>(c2.get())->get_data();
ASSERT_EQ(5, c2->size());
Expand Down
2 changes: 1 addition & 1 deletion be/test/column/const_column_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ PARALLEL_TEST(ConstColumnTest, test_replicate) {
offsets.push_back(5);
offsets.push_back(7);

auto c2 = c1->replicate(offsets);
auto c2 = c1->replicate(offsets).value();

ASSERT_EQ(7, c2->size());
ASSERT_EQ(1, c2->get(6).get_int32());
Expand Down
2 changes: 1 addition & 1 deletion be/test/column/fixed_length_column_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ TEST(FixedLengthColumnTest, test_replicate) {
offsets.push_back(3);
offsets.push_back(5);

auto c2 = column->replicate(offsets);
auto c2 = column->replicate(offsets).value();
ASSERT_EQ(5, c2->size());
ASSERT_EQ(c2->get(0).get_int32(), 7);
ASSERT_EQ(c2->get(1).get_int32(), 7);
Expand Down
2 changes: 1 addition & 1 deletion be/test/column/nullable_column_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ PARALLEL_TEST(NullableColumnTest, test_replicate) {
offsets.push_back(2);
offsets.push_back(4);
offsets.push_back(7);
auto c2 = column->replicate(offsets);
auto c2 = column->replicate(offsets).value();

ASSERT_EQ(1, c2->get(0).get_int32());
ASSERT_EQ(1, c2->get(1).get_int32());
Expand Down
Loading