Skip to content

Commit fb88cf8

Browse files
committed
[fix](iceberg) iceberg use customer method to encode special characters of field name
1 parent e29d8cb commit fb88cf8

File tree

12 files changed

+113
-23
lines changed

12 files changed

+113
-23
lines changed

be/src/vec/exec/format/parquet/schema_desc.cpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "common/logging.h"
2727
#include "runtime/define_primitive_type.h"
2828
#include "util/slice.h"
29+
#include "util/string_util.h"
2930

3031
namespace doris::vectorized {
3132

@@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
239240
return type;
240241
}
241242

243+
// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
244+
static bool is_valid_avro_name(const std::string& name) {
245+
int length = name.length();
246+
char first = name[0];
247+
if (!(isalpha(first) || first == '_')) {
248+
return false;
249+
}
250+
251+
for (int i = 1; i < length; i++) {
252+
char character = name[i];
253+
if (!(isalpha(character) || isdigit(character) || character == '_')) {
254+
return false;
255+
}
256+
}
257+
return true;
258+
}
259+
260+
// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
261+
static void sanitize_avro_name(std::ostringstream& buf, char character) {
262+
if (isdigit(character)) {
263+
buf << '_' << character;
264+
} else {
265+
std::stringstream ss;
266+
ss << std::hex << (int)character;
267+
std::string hex_str = ss.str();
268+
buf << "_x" << doris::to_lower(hex_str);
269+
}
270+
}
271+
272+
// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
273+
static std::string sanitize_avro_name(const std::string& name) {
274+
std::ostringstream buf;
275+
int length = name.length();
276+
char first = name[0];
277+
if (!(isalpha(first) || first == '_')) {
278+
sanitize_avro_name(buf, first);
279+
} else {
280+
buf << first;
281+
}
282+
283+
for (int i = 1; i < length; i++) {
284+
char character = name[i];
285+
if (!(isalpha(character) || isdigit(character) || character == '_')) {
286+
sanitize_avro_name(buf, character);
287+
} else {
288+
buf << character;
289+
}
290+
}
291+
return buf.str();
292+
}
293+
294+
void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& read_columns) {
295+
for (const std::string& col : read_columns) {
296+
if (!is_valid_avro_name(col)) {
297+
std::string sanitize_name = sanitize_avro_name(col);
298+
auto it = _name_to_field.find(sanitize_name);
299+
if (it != _name_to_field.end()) {
300+
FieldSchema* schema = const_cast<FieldSchema*>(it->second);
301+
schema->name = col;
302+
_name_to_field.emplace(col, schema);
303+
_name_to_field.erase(sanitize_name);
304+
}
305+
}
306+
}
307+
}
308+
242309
TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
243310
TypeDescriptor type;
244311
if (logicalType.__isset.STRING) {

be/src/vec/exec/format/parquet/schema_desc.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ class FieldDescriptor {
9191
public:
9292
TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema);
9393

94+
// org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters,
95+
// we have to decode these characters
96+
void iceberg_sanitize(const std::vector<std::string>& read_columns);
97+
9498
FieldDescriptor() = default;
9599
~FieldDescriptor() = default;
96100

be/src/vec/exec/format/parquet/vparquet_file_metadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class FileMetaData {
3232
Status init_schema();
3333
const FieldDescriptor& schema() const { return _schema; }
3434
const tparquet::FileMetaData& to_thrift();
35+
void iceberg_sanitize(const std::vector<std::string>& read_columns) {
36+
_schema.iceberg_sanitize(read_columns);
37+
}
3538
std::string debug_string() const;
3639

3740
private:

be/src/vec/exec/format/parquet/vparquet_reader.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,12 @@ void ParquetReader::_init_file_description() {
306306
}
307307
}
308308

309+
void ParquetReader::iceberg_sanitize(const std::vector<std::string>& read_columns) {
310+
if (_file_metadata != nullptr) {
311+
_file_metadata->iceberg_sanitize(read_columns);
312+
}
313+
}
314+
309315
Status ParquetReader::init_reader(
310316
const std::vector<std::string>& all_column_names,
311317
const std::vector<std::string>& missing_column_names,

be/src/vec/exec/format/parquet/vparquet_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ class ParquetReader : public GenericReader {
138138

139139
const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
140140

141+
// Only for iceberg reader to sanitize invalid column names
142+
void iceberg_sanitize(const std::vector<std::string>& read_columns);
143+
141144
Status set_fill_columns(
142145
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
143146
partition_columns,

be/src/vec/exec/format/table/iceberg_reader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ Status IcebergTableReader::init_reader(
128128
_gen_file_col_names();
129129
_gen_new_colname_to_value_range();
130130
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
131+
parquet_reader->iceberg_sanitize(_all_required_col_names);
131132
Status status = parquet_reader->init_reader(
132133
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
133134
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,

be/src/vec/exec/scan/vfile_scanner.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
467467

468468
Status VFileScanner::_fill_columns_from_path(size_t rows) {
469469
DataTypeSerDe::FormatOptions _text_formatOptions;
470-
for (auto& kv : *_partition_columns) {
470+
for (auto& kv : _partition_col_descs) {
471471
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
472472
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
473473
auto& [value, slot_desc] = kv.second;
@@ -500,7 +500,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
500500
}
501501

502502
SCOPED_TIMER(_fill_missing_columns_timer);
503-
for (auto& kv : *_missing_columns) {
503+
for (auto& kv : _missing_col_descs) {
504504
if (kv.second == nullptr) {
505505
// no default column, fill with null
506506
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
@@ -930,9 +930,8 @@ Status VFileScanner::_get_next_reader() {
930930
}
931931

932932
Status VFileScanner::_generate_fill_columns() {
933-
_partition_columns.reset(
934-
new std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>());
935-
_missing_columns.reset(new std::unordered_map<std::string, VExprContextSPtr>());
933+
_partition_col_descs.clear();
934+
_missing_col_descs.clear();
936935

937936
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
938937
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
@@ -949,8 +948,8 @@ Status VFileScanner::_generate_fill_columns() {
949948
if (size == 4 && memcmp(data, "null", 4) == 0) {
950949
data = const_cast<char*>("\\N");
951950
}
952-
_partition_columns->emplace(slot_desc->col_name(),
953-
std::make_tuple(data, slot_desc));
951+
_partition_col_descs.emplace(slot_desc->col_name(),
952+
std::make_tuple(data, slot_desc));
954953
}
955954
}
956955
}
@@ -969,16 +968,11 @@ Status VFileScanner::_generate_fill_columns() {
969968
return Status::InternalError("failed to find default value expr for slot: {}",
970969
slot_desc->col_name());
971970
}
972-
_missing_columns->emplace(slot_desc->col_name(), it->second);
971+
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
973972
}
974973
}
975974

976-
RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns));
977-
if (_cur_reader->fill_all_columns()) {
978-
_partition_columns.reset(nullptr);
979-
_missing_columns.reset(nullptr);
980-
}
981-
return Status::OK();
975+
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
982976
}
983977

984978
Status VFileScanner::_init_expr_ctxes() {

be/src/vec/exec/scan/vfile_scanner.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,9 @@ class VFileScanner : public VScanner {
162162
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
163163
std::unique_ptr<io::IOContext> _io_ctx;
164164

165-
std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>>
166-
_partition_columns;
167-
std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> _missing_columns;
165+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
166+
_partition_col_descs;
167+
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
168168

169169
private:
170170
RuntimeProfile::Counter* _get_block_timer = nullptr;

fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public List<Column> initSchema() {
435435
} else {
436436
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
437437
for (FieldSchema field : schema) {
438-
tmpSchema.add(new Column(field.getName(),
438+
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
439439
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
440440
true, field.getComment(), true, -1));
441441
}
@@ -484,7 +484,7 @@ private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
484484
Schema schema = icebergTable.schema();
485485
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
486486
for (FieldSchema field : hmsSchema) {
487-
tmpSchema.add(new Column(field.getName(),
487+
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
488488
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
489489
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
490490
true, null, true, false, null, field.getComment(), true, null,
@@ -500,7 +500,7 @@ protected void initPartitionColumns(List<Column> schema) {
500500
for (String partitionKey : partitionKeys) {
501501
// Do not use "getColumn()", which will cause dead loop
502502
for (Column column : schema) {
503-
if (partitionKey.equals(column.getName())) {
503+
if (partitionKey.equalsIgnoreCase(column.getName())) {
504504
// For partition column, if it is string type, change it to varchar(65535)
505505
// to be same as doris managed table.
506506
// This is to avoid some unexpected behavior such as different partition pruning result
@@ -524,7 +524,7 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
524524
return getHiveColumnStats(colName);
525525
case ICEBERG:
526526
return StatisticsUtil.getIcebergColumnStats(colName,
527-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
527+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
528528
default:
529529
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
530530
}

fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.util.HashMap;
3838
import java.util.List;
39+
import java.util.Locale;
3940
import java.util.Optional;
4041

4142
public class IcebergExternalTable extends ExternalTable {
@@ -66,7 +67,7 @@ public List<Column> initSchema() {
6667
List<Types.NestedField> columns = schema.columns();
6768
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
6869
for (Types.NestedField field : columns) {
69-
tmpSchema.add(new Column(field.name(),
70+
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
7071
icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
7172
schema.caseInsensitiveFindField(field.name()).fieldId()));
7273
}

0 commit comments

Comments
 (0)