Skip to content

Commit

Permalink
[fix](iceberg) iceberg use customer method to encode special characte…
Browse files Browse the repository at this point in the history
…rs of field name (#27108) (#27205)

Fix two bugs:
1. Missing column is case sensitive, change the column name to lower case in FE for hive/iceberg/hudi
2. Iceberg use custom method to encode special characters in column name. Decode the column name to match the right column in parquet reader.
  • Loading branch information
AshinGau authored Nov 17, 2023
1 parent 41f759a commit f67ed9f
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 22 deletions.
67 changes: 67 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/logging.h"
#include "runtime/define_primitive_type.h"
#include "util/slice.h"
#include "util/string_util.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
return type;
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
static bool is_valid_avro_name(const std::string& name) {
int length = name.length();
char first = name[0];
if (!isalpha(first) && first != '_') {
return false;
}

for (int i = 1; i < length; i++) {
char character = name[i];
if (!isalpha(character) && !isdigit(character) && character != '_') {
return false;
}
}
return true;
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
static void sanitize_avro_name(std::ostringstream& buf, char character) {
if (isdigit(character)) {
buf << '_' << character;
} else {
std::stringstream ss;
ss << std::hex << (int)character;
std::string hex_str = ss.str();
buf << "_x" << doris::to_lower(hex_str);
}
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
static std::string sanitize_avro_name(const std::string& name) {
std::ostringstream buf;
int length = name.length();
char first = name[0];
if (!isalpha(first) && first != '_') {
sanitize_avro_name(buf, first);
} else {
buf << first;
}

for (int i = 1; i < length; i++) {
char character = name[i];
if (!isalpha(character) && !isdigit(character) && character != '_') {
sanitize_avro_name(buf, character);
} else {
buf << character;
}
}
return buf.str();
}

void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& read_columns) {
for (const std::string& col : read_columns) {
if (!is_valid_avro_name(col)) {
std::string sanitize_name = sanitize_avro_name(col);
auto it = _name_to_field.find(sanitize_name);
if (it != _name_to_field.end()) {
FieldSchema* schema = const_cast<FieldSchema*>(it->second);
schema->name = col;
_name_to_field.emplace(col, schema);
_name_to_field.erase(sanitize_name);
}
}
}
}

TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
TypeDescriptor type;
if (logicalType.__isset.STRING) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class FieldDescriptor {
TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema);

public:
// org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters,
// we have to decode these characters
void iceberg_sanitize(const std::vector<std::string>& read_columns);

FieldDescriptor() = default;
~FieldDescriptor() = default;

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_file_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class FileMetaData {
Status init_schema();
const FieldDescriptor& schema() const { return _schema; }
const tparquet::FileMetaData& to_thrift();
void iceberg_sanitize(const std::vector<std::string>& read_columns) {
_schema.iceberg_sanitize(read_columns);
}
std::string debug_string() const;

private:
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ void ParquetReader::_init_file_description() {
}
}

void ParquetReader::iceberg_sanitize(const std::vector<std::string>& read_columns) {
if (_file_metadata != nullptr) {
_file_metadata->iceberg_sanitize(read_columns);
}
}

Status ParquetReader::init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class ParquetReader : public GenericReader {

const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }

// Only for iceberg reader to sanitize invalid column names
void iceberg_sanitize(const std::vector<std::string>& read_columns);

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Status IcebergTableReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
Status status = parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
Expand Down
22 changes: 8 additions & 14 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
}

Status VFileScanner::_fill_columns_from_path(size_t rows) {
for (auto& kv : *_partition_columns) {
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
Expand All @@ -437,7 +437,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
}

SCOPED_TIMER(_fill_missing_columns_timer);
for (auto& kv : *_missing_columns) {
for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
Expand Down Expand Up @@ -862,9 +862,8 @@ Status VFileScanner::_get_next_reader() {
}

Status VFileScanner::_generate_fill_columns() {
_partition_columns.reset(
new std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>());
_missing_columns.reset(new std::unordered_map<std::string, VExprContextSPtr>());
_partition_col_descs.clear();
_missing_col_descs.clear();

const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
Expand All @@ -881,8 +880,8 @@ Status VFileScanner::_generate_fill_columns() {
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = TextConverter::NULL_STR;
}
_partition_columns->emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
}
}
}
Expand All @@ -901,16 +900,11 @@ Status VFileScanner::_generate_fill_columns() {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
_missing_columns->emplace(slot_desc->col_name(), it->second);
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}

RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns));
if (_cur_reader->fill_all_columns()) {
_partition_columns.reset(nullptr);
_missing_columns.reset(nullptr);
}
return Status::OK();
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
}

Status VFileScanner::_init_expr_ctxes() {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ class VFileScanner : public VScanner {
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;

std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>>
_partition_columns;
std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> _missing_columns;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;

private:
RuntimeProfile::Counter* _get_block_timer = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public List<Column> initSchema() {
} else {
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
tmpSchema.add(new Column(field.getName(),
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true, -1));
}
Expand Down Expand Up @@ -484,7 +484,7 @@ private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
tmpSchema.add(new Column(field.getName(),
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
true, null, true, false, null, field.getComment(), true, null,
Expand All @@ -500,7 +500,7 @@ protected void initPartitionColumns(List<Column> schema) {
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
if (partitionKey.equals(column.getName())) {
if (partitionKey.equalsIgnoreCase(column.getName())) {
// For partition column, if it is string type, change it to varchar(65535)
// to be same as doris managed table.
// This is to avoid some unexpected behavior such as different partition pruning result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

public class IcebergExternalTable extends ExternalTable {
Expand Down Expand Up @@ -66,7 +67,7 @@ public List<Column> initSchema() {
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name(),
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
599715
599715

-- !sanitize_mara --
MATNR1 3.140 /DSD/SV_CNT_GRP1
MATNR2 3.240 /DSD/SV_CNT_GRP2
MATNR4 3.440 /DSD/SV_CNT_GRP4
MATNR5 3.540 /DSD/SV_CNT_GRP5
MATNR6 3.640 /DSD/SV_CNT_GRP6

Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common", "p2,external,iceberg,external_remo
}
sql """ use `iceberg_catalog`; """
q01_parquet()

// test the special characters in table fields
qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from sanitize_mara order by mAtNr"""
}
}

0 comments on commit f67ed9f

Please sign in to comment.