Skip to content

Commit

Permalink
[fix](hive) support null_format and escape_char for hive text (apache…
Browse files Browse the repository at this point in the history
…#40291)

## Proposed changes
related pr: apache#39700
support escape.delim and serialization.null.format for hive text
  • Loading branch information
suxiaogang223 committed Oct 10, 2024
1 parent 657fb5b commit d941524
Show file tree
Hide file tree
Showing 17 changed files with 191 additions and 67 deletions.
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ Status DataTypeArraySerDe::deserialize_one_cell_from_hive_text(
for (int idx = 0, start = 0; idx <= slice.size; idx++) {
char c = (idx == slice.size) ? collection_delimiter : slice[idx];
if (c == collection_delimiter) {
if (options.escape_char != 0 && idx > 0 && slice[idx - 1] == options.escape_char) {
continue;
}
slices.emplace_back(slice.data + start, idx - start);
start = idx + 1;
}
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/data_types/serde/data_type_map_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,17 @@ Status DataTypeMapSerDe::deserialize_one_cell_from_hive_text(
*
* So i use 'kv <= from' in order to get _map_kv_delimiter that appears first.
* */
if (i < slice.size && slice[i] == map_kv_delimiter && kv <= from) {
if (i < slice.size && slice[i] == map_kv_delimiter && kv <= from &&
(options.escape_char == 0 || i == 0 || slice[i - 1] != options.escape_char)) {
kv = i;
continue;
}
if ((i == slice.size || slice[i] == collection_delimiter) && i >= kv + 1) {
key_slices.push_back({slice.data + from, kv - from});
value_slices.push_back({slice.data + kv + 1, i - 1 - kv});
if (options.escape_char != 0 && i > 0 && slice[i - 1] == options.escape_char) {
continue;
}
key_slices.emplace_back(slice.data + from, kv - from);
value_slices.emplace_back(slice.data + kv + 1, i - 1 - kv);
from = i + 1;
kv = from;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/data_types/serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status DataTypeNullableSerDe::serialize_one_cell_to_hive_text(

const auto& col_null = assert_cast<const ColumnNullable&>(*ptr);
if (col_null.is_null_at(row_num)) {
bw.write(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), 2);
bw.write(options.null_format, options.null_len);
} else {
RETURN_IF_ERROR(nested_serde->serialize_one_cell_to_hive_text(
col_null.get_nested_column(), row_num, bw, options,
Expand All @@ -101,7 +101,7 @@ Status DataTypeNullableSerDe::deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
int hive_text_complex_type_delimiter_level) const {
auto& null_column = assert_cast<ColumnNullable&>(column);
if (slice.size == 2 && slice[0] == '\\' && slice[1] == 'N') {
if (slice.compare(Slice(options.null_format, options.null_len)) == 0) {
null_column.insert_data(nullptr, 0);
return Status::OK();
}
Expand Down
17 changes: 11 additions & 6 deletions be/src/vec/data_types/serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class DataTypeSerDe {
bool converted_from_string = false;

char escape_char = 0;
/**
* flags for each byte to indicate if escape is needed.
*/
bool need_escape[256] = {false};

/**
* only used for export data
Expand All @@ -148,8 +152,8 @@ class DataTypeSerDe {
* NULL
* null
*/
const char* null_format;
int null_len;
const char* null_format = "\\N";
int null_len = 2;

/**
* The wrapper char for string type in nested type.
Expand All @@ -166,7 +170,7 @@ class DataTypeSerDe {
CHECK(0 <= hive_text_complex_type_delimiter_level &&
hive_text_complex_type_delimiter_level <= 153);

char ans = '\002';
char ans;
//https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySerDeParameters.java#L250
//use only control chars that are very unlikely to be part of the string
// the following might/likely to be used in text files for strings
Expand All @@ -175,8 +179,9 @@ class DataTypeSerDe {
// 12 (form feed, FF, \f, ^L),
// 13 (carriage return, CR, \r, ^M),
// 27 (escape, ESC, \e [GCC only], ^[).

if (hive_text_complex_type_delimiter_level == 1) {
if (hive_text_complex_type_delimiter_level == 0) {
ans = field_delim[0];
} else if (hive_text_complex_type_delimiter_level == 1) {
ans = collection_delim;
} else if (hive_text_complex_type_delimiter_level == 2) {
ans = map_key_delim;
Expand All @@ -192,7 +197,7 @@ class DataTypeSerDe {
} else if (hive_text_complex_type_delimiter_level <= 25) {
// [22, 25] -> [28, 31]
ans = hive_text_complex_type_delimiter_level + 6;
} else if (hive_text_complex_type_delimiter_level <= 153) {
} else {
// [26, 153] -> [-128, -1]
ans = hive_text_complex_type_delimiter_level + (-26 - 128);
}
Expand Down
42 changes: 42 additions & 0 deletions be/src/vec/data_types/serde/data_type_string_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
return Status::OK();
}

Status serialize_one_cell_to_hive_text(
const IColumn& column, int row_num, BufferWritable& bw, FormatOptions& options,
int hive_text_complex_type_delimiter_level = 1) const override {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;
const auto& value = assert_cast<const ColumnType&>(*ptr).get_data_at(row_num);
if constexpr (std::is_same_v<ColumnType, ColumnString>) {
if (options.escape_char != 0) {
StringRef str_ref = value;
write_with_escaped_char_to_hive_text(str_ref, bw, options.escape_char,
options.need_escape);
} else {
bw.write(value.data, value.size);
}
} else {
bw.write(value.data, value.size);
}
return Status::OK();
}

inline void write_with_escaped_char_to_json(StringRef value, BufferWritable& bw) const {
for (char it : value) {
switch (it) {
Expand Down Expand Up @@ -126,6 +147,17 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
}
}

inline void write_with_escaped_char_to_hive_text(StringRef value, BufferWritable& bw,
char escape_char,
const bool need_escape[]) const {
for (char it : value) {
if (need_escape[it & 0xff]) {
bw.write(escape_char);
}
bw.write(it);
}
}

Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override {
SERIALIZE_COLUMN_TO_JSON();
Expand Down Expand Up @@ -154,6 +186,16 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
return Status::OK();
}

Status deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
int hive_text_complex_type_delimiter_level = 1) const override {
if (options.escape_char != 0) {
escape_string(slice.data, slice.size, options.escape_char);
}
assert_cast<ColumnType&>(column).insert_data(slice.data, slice.size);
return Status::OK();
}

Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const override {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_struct_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_hive_text(
char* data = slice.data;
for (size_t i = 0, from = 0; i <= slice.size; i++) {
if (i == slice.size || data[i] == struct_delimiter) {
if (options.escape_char != 0 && i > 0 && data[i - 1] == options.escape_char) {
continue;
}
slices.push_back({data + from, i - from});
from = i + 1;
}
Expand Down
34 changes: 31 additions & 3 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,23 @@ void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>*
}
}

void HiveCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
const char* data = line.data;
const size_t size = line.size;
size_t value_start = 0;
for (size_t i = 0; i < size; ++i) {
if (data[i] == _value_sep[0]) {
// hive will escape the field separator in string
if (_escape_char != 0 && i > 0 && data[i - 1] == _escape_char) {
continue;
}
process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
value_start = i + _value_sep_len;
}
}
process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
}

CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx)
Expand Down Expand Up @@ -352,6 +369,12 @@ Status CsvReader::init_reader(bool is_load) {
} else {
_options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
}

if (_params.file_attributes.text_params.__isset.null_format) {
_options.null_format = _params.file_attributes.text_params.null_format.data();
_options.null_len = _params.file_attributes.text_params.null_format.length();
}

_use_nullable_string_opt.resize(_file_slot_descs.size());
for (int i = 0; i < _file_slot_descs.size(); ++i) {
auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
Expand All @@ -376,9 +399,14 @@ Status CsvReader::init_reader(bool is_load) {
if (_enclose == 0) {
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
_line_delimiter, _line_delimiter_length, _keep_cr);

_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
if (_text_serde_type == TTextSerdeType::HIVE_TEXT_SERDE) {
_fields_splitter = std::make_unique<HiveCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1,
_escape);
} else {
_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
}
} else {
text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFi
std::string _value_sep;
};

class HiveCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<HiveCsvTextFieldSplitter> {
public:
explicit HiveCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
const string& value_sep, size_t value_sep_len = 1,
char trimming_char = 0, char escape_char = 0)
: BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
_value_sep(value_sep),
_escape_char(escape_char) {}

void do_split(const Slice& line, std::vector<Slice>* splitted_values);

private:
std::string _value_sep;
char _escape_char;
};

class CsvReader : public GenericReader {
ENABLE_FACTORY_CREATOR(CsvReader);

Expand Down
15 changes: 13 additions & 2 deletions be/src/vec/runtime/vcsv_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,21 @@ VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* fil
}

if (_is_text_format) {
_options.field_delim = hive_serde_properties->field_delim;
_options.collection_delim = hive_serde_properties->collection_delim[0];
_options.map_key_delim = hive_serde_properties->mapkv_delim[0];
_options.escape_char = hive_serde_properties->escape_char[0];
_options.null_format = hive_serde_properties->null_format.c_str();
if (hive_serde_properties->__isset.escape_char) {
_options.escape_char = hive_serde_properties->escape_char[0];
}
_options.null_format = hive_serde_properties->null_format.data();
_options.null_len = hive_serde_properties->null_format.length();
// The list of separators + escapeChar are the bytes required to be escaped.
if (_options.escape_char != 0) {
_options.need_escape[_options.escape_char & 0xff] = true;
}
for (int i = 0; i <= 153; i++) {
_options.need_escape[_options.get_collection_delimiter(i) & 0xff] = true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ CREATE TABLE IF NOT EXISTS `text_all_types`(
`t_decimal_precision_38` decimal(38,16),
`t_binary` binary
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION
'/user/doris/preinstalled_data/text/text_all_types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,21 @@ CREATE TABLE `serde_test7`(
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'escape.delim' = '|'
)
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

CREATE TABLE `serde_test8`(
`id` int,
`name` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim' = 'a',
'escape.delim' = '|',
'serialization.null.format' = 'null'
)
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

CREATE TABLE `serde_test8` like `serde_test7`;

insert into serde_test1 values(1, "abc"),(2, "def");
insert into serde_test2 values(1, "abc"),(2, "def");
insert into serde_test3 values(1, "abc"),(2, "def");
insert into serde_test4 values(1, "abc"),(2, "def");
insert into serde_test5 values(1, "abc"),(2, "def");
insert into serde_test6 values(1, "abc"),(2, "def");
insert into serde_test7 values(1, "abc"),(2, "def");
insert into serde_test8 values(1, "abc"),(2, "def");
insert into serde_test7 values(1, null),(2, "|||"),(3, "aaa"),(4, "\"null\"");
Original file line number Diff line number Diff line change
Expand Up @@ -482,21 +482,24 @@ protected TFileAttributes getFileAttributes() throws UserException {
if (serdeParams.containsKey(PROP_QUOTE_CHAR)) {
textParams.setEnclose(serdeParams.get(PROP_QUOTE_CHAR).getBytes()[0]);
}

// TODO: support escape char and null format in csv_reader
Optional<String> escapeChar = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
// 6. set escape delimiter
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
PROP_ESCAPE_DELIMITER);
if (escapeChar.isPresent() && !escapeChar.get().equals(DEFAULT_ESCAPE_DELIMIER)) {
throw new UserException(
"not support serde prop " + PROP_ESCAPE_DELIMITER + " in hive text reading");
if (escapeDelim.isPresent()) {
String escape = HiveMetaStoreClientHelper.getByte(
escapeDelim.get());
if (escape != null) {
textParams
.setEscape(escape.getBytes()[0]);
} else {
textParams.setEscape(DEFAULT_ESCAPE_DELIMIER.getBytes()[0]);
}
}

// 7. set null format
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
PROP_NULL_FORMAT);
if (nullFormat.isPresent() && !nullFormat.get().equals(DEFAULT_NULL_FORMAT)) {
throw new UserException(
"not support serde prop " + PROP_NULL_FORMAT + " in hive text reading");
}
textParams.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_NULL_FORMAT, nullFormat));

TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,16 @@ private void setSerDeProperties(THiveTableSink tSink) {
// 5. set escape delimiter
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_ESCAPE_DELIMITER);
serDeProperties
.setEscapeChar(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_ESCAPE_DELIMIER, escapeDelim)));
if (escapeDelim.isPresent()) {
String escape = HiveMetaStoreClientHelper.getByte(
escapeDelim.get());
if (escape != null) {
serDeProperties
.setEscapeChar(escape);
} else {
serDeProperties.setEscapeChar(DEFAULT_ESCAPE_DELIMIER);
}
}
// 6. set null format
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_NULL_FORMAT);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ struct TFileTextScanRangeParams {
4: optional string mapkv_delimiter;
5: optional i8 enclose;
6: optional i8 escape;
7: optional string null_format;
}

struct TFileScanSlotInfo {
Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit d941524

Please sign in to comment.