Skip to content

Commit

Permalink
opt invalid data
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Oct 16, 2023
1 parent c4fe431 commit d2dc461
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 62 deletions.
78 changes: 31 additions & 47 deletions be/src/vec/sink/vtablet_block_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows));
}

int64_t filtered_rows = 0;
int filtered_rows = 0;
{
SCOPED_RAW_TIMER(&_validate_data_ns);
_filter_bitmap.Reset(block->rows());
_filter_map.resize(rows, 0);
bool stop_processing = false;
RETURN_IF_ERROR(_validate_data(state, block.get(), filtered_rows, &stop_processing));
RETURN_IF_ERROR(_validate_data(state, block.get(), rows, filtered_rows, &stop_processing));
_num_filtered_rows += filtered_rows;
has_filtered_rows = filtered_rows > 0;
if (stop_processing) {
Expand Down Expand Up @@ -161,11 +161,12 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
bool is_nullable, vectorized::ColumnPtr column,
size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix,
const uint32_t row_count,
vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == column->size()));
DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
_filter_bitmap.Set(row, true);
_filter_map[row] = true;
auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; },
[&error_prefix, &error_msg]() -> std::string {
return fmt::to_string(error_prefix) +
Expand All @@ -180,10 +181,9 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
auto& real_column_ptr = column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr());
auto null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data();
auto need_to_validate = [&null_map, this](size_t j, size_t row) {
return !_filter_bitmap.Get(row) && (null_map == nullptr || null_map[j] == 0);
return !_filter_map[row] && (null_map == nullptr || null_map[j] == 0);
};

ssize_t last_invalid_row = -1;
switch (type.type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
Expand All @@ -197,24 +197,19 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
limit = std::min(config::string_type_length_soft_limit_bytes, type.len);
}

const auto column_size = column_string->size();
auto* __restrict offsets = column_string->get_offsets().data();
int invalid_count = 0;
for (int j = 0; j < column_size; ++j) {
for (int j = 0; j < row_count; ++j) {
invalid_count += (offsets[j] - offsets[j - 1]) > limit;
}

if (invalid_count) {
for (size_t j = 0; j < column->size(); ++j) {
for (size_t j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (row == last_invalid_row) {
continue;
}
if (need_to_validate(j, row)) {
auto str_val = column_string->get_data_at(j);
bool invalid = str_val.size > limit;
if (invalid) {
last_invalid_row = row;
if (str_val.size > type.len) {
fmt::format_to(error_msg, "{}",
"the length of input is too long than schema. ");
Expand All @@ -240,10 +235,10 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
break;
}
case TYPE_JSONB: {
const auto column_string =
const auto* column_string =
assert_cast<const vectorized::ColumnString*>(real_column_ptr.get());
for (size_t j = 0; j < column->size(); ++j) {
if (!_filter_bitmap.Get(j)) {
for (size_t j = 0; j < row_count; ++j) {
if (!_filter_map[j]) {
if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
continue;
}
Expand All @@ -259,16 +254,13 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
break;
}
case TYPE_DECIMALV2: {
auto column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
auto* column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128>*>(
real_column_ptr.get()));
const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
for (size_t j = 0; j < column->size(); ++j) {
for (size_t j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (row == last_invalid_row) {
continue;
}
if (need_to_validate(j, row)) {
auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
column_decimal->get_data()[j]);
Expand All @@ -295,7 +287,6 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
}

if (invalid) {
last_invalid_row = row;
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
Expand All @@ -308,19 +299,15 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
assert_cast<const vectorized::ColumnDecimal<DecimalType>*>(real_column_ptr.get())); \
const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type); \
const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type); \
const auto column_size = column_decimal->size(); \
const auto* __restrict datas = column_decimal->get_data().data(); \
int invalid_count = 0; \
for (int j = 0; j < column_size; ++j) { \
for (int j = 0; j < row_count; ++j) { \
const auto dec_val = datas[j]; \
invalid_count += dec_val > max_decimal || dec_val < min_decimal; \
} \
if (invalid_count) { \
for (size_t j = 0; j < column->size(); ++j) { \
for (size_t j = 0; j < row_count; ++j) { \
auto row = rows ? (*rows)[j] : j; \
if (row == last_invalid_row) { \
continue; \
} \
if (need_to_validate(j, row)) { \
auto dec_val = column_decimal->get_data()[j]; \
bool invalid = false; \
Expand All @@ -333,7 +320,6 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
invalid = true; \
} \
if (invalid) { \
last_invalid_row = row; \
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \
} \
} \
Expand All @@ -352,21 +338,21 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
}
#undef CHECK_VALIDATION_FOR_DECIMALV3
case TYPE_ARRAY: {
const auto column_array =
const auto* column_array =
assert_cast<const vectorized::ColumnArray*>(real_column_ptr.get());
DCHECK(type.children.size() == 1);
auto nested_type = type.children[0];
const auto& offsets = column_array->get_offsets();
vectorized::IColumn::Permutation permutation(offsets.back());
for (size_t r = 0; r < offsets.size(); ++r) {
for (size_t r = 0; r < row_count; ++r) {
for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
permutation[c] = rows ? (*rows)[r] : r;
}
}
fmt::format_to(error_prefix, "ARRAY type failed: ");
RETURN_IF_ERROR(_validate_column(state, nested_type, type.contains_nulls[0],
column_array->get_data_ptr(), slot_index, stop_processing,
error_prefix, &permutation));
error_prefix, permutation.size(), &permutation));
break;
}
case TYPE_MAP: {
Expand All @@ -376,18 +362,18 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
auto val_type = type.children[1];
const auto& offsets = column_map->get_offsets();
vectorized::IColumn::Permutation permutation(offsets.back());
for (size_t r = 0; r < offsets.size(); ++r) {
for (size_t r = 0; r < row_count; ++r) {
for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
permutation[c] = rows ? (*rows)[r] : r;
}
}
fmt::format_to(error_prefix, "MAP type failed: ");
RETURN_IF_ERROR(_validate_column(state, key_type, type.contains_nulls[0],
column_map->get_keys_ptr(), slot_index, stop_processing,
error_prefix, &permutation));
error_prefix, permutation.size(), &permutation));
RETURN_IF_ERROR(_validate_column(state, val_type, type.contains_nulls[1],
column_map->get_values_ptr(), slot_index, stop_processing,
error_prefix, &permutation));
error_prefix, permutation.size(), &permutation));
break;
}
case TYPE_STRUCT: {
Expand All @@ -398,7 +384,8 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
RETURN_IF_ERROR(_validate_column(state, type.children[sc], type.contains_nulls[sc],
column_struct->get_column_ptr(sc), slot_index,
stop_processing, error_prefix));
stop_processing, error_prefix,
column_struct->get_column_ptr(sc)->size()));
}
break;
}
Expand All @@ -411,15 +398,11 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
// 1. column is nullable but the desc is not nullable
// 2. desc->type is BITMAP
if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) {
for (int j = 0; j < column->size(); ++j) {
for (int j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (row == last_invalid_row) {
continue;
}
if (null_map[j] && !_filter_bitmap.Get(row)) {
if (null_map[j] && !_filter_map[row]) {
fmt::format_to(error_msg, "null value for not null column, type={}",
type.debug_string());
last_invalid_row = row;
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
Expand All @@ -429,7 +412,8 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
}

Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::Block* block,
int64_t& filtered_rows, bool* stop_processing) {
const uint32_t rows, int& filtered_rows,
bool* stop_processing) {
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
DCHECK(block->columns() > i)
Expand All @@ -444,12 +428,12 @@ Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::
fmt::memory_buffer error_prefix;
fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
RETURN_IF_ERROR(_validate_column(state, desc->type(), desc->is_nullable(), column, i,
stop_processing, error_prefix));
stop_processing, error_prefix, rows));
}

filtered_rows = 0;
for (int i = 0; i < block->rows(); ++i) {
filtered_rows += _filter_bitmap.Get(i);
for (int i = 0; i < rows; ++i) {
filtered_rows += _filter_map[i];
}
return Status::OK();
}
Expand Down
12 changes: 6 additions & 6 deletions be/src/vec/sink/vtablet_block_convertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ namespace doris::vectorized {
class OlapTableBlockConvertor {
public:
OlapTableBlockConvertor(TupleDescriptor* output_tuple_desc)
: _output_tuple_desc(output_tuple_desc), _filter_bitmap(1024) {}
: _output_tuple_desc(output_tuple_desc) {}

Status validate_and_convert_block(RuntimeState* state, vectorized::Block* input_block,
std::shared_ptr<vectorized::Block>& block,
vectorized::VExprContextSPtrs output_vexpr_ctxs, size_t rows,
bool& has_filtered_rows);

const Bitmap& filter_bitmap() { return _filter_bitmap; }
const char* filter_map() const { return _filter_map.data(); }

int64_t validate_data_ns() const { return _validate_data_ns; }

Expand All @@ -66,15 +66,15 @@ class OlapTableBlockConvertor {

Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows = nullptr);

// make input data valid for OLAP table
// return number of invalid/filtered rows.
// invalid row number is set in Bitmap
// set stop_processing if we want to stop the whole process now.
Status _validate_data(RuntimeState* state, vectorized::Block* block, int64_t& filtered_rows,
bool* stop_processing);
Status _validate_data(RuntimeState* state, vectorized::Block* block, const uint32_t rows,
int& filtered_rows, bool* stop_processing);

// some output column of output expr may have different nullable property with dest slot desc
// so here need to do the convert operation
Expand All @@ -94,7 +94,7 @@ class OlapTableBlockConvertor {
std::map<int, int128_t> _max_decimal128_val;
std::map<int, int128_t> _min_decimal128_val;

Bitmap _filter_bitmap;
std::vector<char> _filter_map;

int64_t _validate_data_ns = 0;
int64_t _num_filtered_rows = 0;
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/sink/vtablet_sink_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc
RowsForTablet rows_for_tablet;
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
auto num_rows = block->rows();
const auto num_rows = input_rows;
const auto* __restrict filter_map = _block_convertor->filter_map();
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) {
if (UNLIKELY(has_filtered_rows) && filter_map[i]) {
continue;
}
const VOlapTablePartition* partition = nullptr;
Expand Down
Loading

0 comments on commit d2dc461

Please sign in to comment.