Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](ES catalog)Doris cannot parse ES date field without time zone #24864

Merged
merged 6 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,42 @@ Status get_int_value(const rapidjson::Value& col, PrimitiveType type, void* slot

template <typename T, typename RT>
Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool is_date_str,
RT* slot) {
RT* slot, const cctz::time_zone& time_zone) {
constexpr bool is_datetime_v1 = std::is_same_v<T, vectorized::VecDateTimeValue>;
T dt_val;
if (is_date_str) {
const std::string str_date = col.GetString();
int str_length = col.GetStringLength();
bool success = false;
// YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DDTHH:MM:SS+08:00 or 2022-08-08T12:10:10.000Z
if (str_length > 19) {
std::chrono::system_clock::time_point tp;
const bool ok =
cctz::parse("%Y-%m-%dT%H:%M:%E*S%Ez", str_date, cctz::utc_time_zone(), &tp);
// time_zone suffix pattern
// Z/+08:00/-04:30
RE2 time_zone_pattern(R"([+-]\d{2}:\d{2}|Z)");
bool ok = false;
std::string fmt;
re2::StringPiece value;
if (time_zone_pattern.Match(str_date, 0, str_date.size(), RE2::UNANCHORED, &value, 1)) {
// with time_zone info
// YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DDTHH:MM:SS+08:00
// or 2022-08-08T12:10:10.000Z or YYYY-MM-DDTHH:MM:SS-08:00
fmt = "%Y-%m-%dT%H:%M:%E*S%Ez";
cctz::time_zone ctz;
// find time_zone by time_zone suffix string
TimezoneUtils::find_cctz_time_zone(value.as_string(), ctz);
ok = cctz::parse(fmt, str_date, ctz, &tp);
} else {
// without time_zone info
// 2022-08-08T12:10:10.000
fmt = "%Y-%m-%dT%H:%M:%E*S";
// If the time without time_zone info, ES will assume it is UTC time.
// So we parse it in Doris with UTC time zone.
ok = cctz::parse(fmt, str_date, cctz::utc_time_zone(), &tp);
}
if (ok) {
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp),
cctz::local_time_zone());
// The local time zone can change by session variable `time_zone`
// We should use the user specified time zone, not the actual system local time zone.
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
}
} else if (str_length == 19) {
// YYYY-MM-DDTHH:MM:SS
Expand All @@ -211,7 +232,7 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
cctz::parse("%Y-%m-%dT%H:%M:%S", str_date, cctz::utc_time_zone(), &tp);
if (ok) {
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp),
cctz::local_time_zone());
time_zone);
}
} else {
// YYYY-MM-DD HH:MM:SS
Expand All @@ -222,7 +243,7 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
// string long like "1677895728000"
int64_t time_long = std::atol(str_date.c_str());
if (time_long > 0) {
success = dt_val.from_unixtime(time_long / 1000, cctz::local_time_zone());
success = dt_val.from_unixtime(time_long / 1000, time_zone);
}
} else {
// YYYY-MM-DD or others
Expand All @@ -234,7 +255,7 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
}

} else {
if (!dt_val.from_unixtime(col.GetInt64() / 1000, cctz::local_time_zone())) {
if (!dt_val.from_unixtime(col.GetInt64() / 1000, time_zone)) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
}
Expand All @@ -251,37 +272,37 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
}

template <typename T, typename RT>
Status get_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value,
RT* slot) {
Status get_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value, RT* slot,
const cctz::time_zone& time_zone) {
// this would happend just only when `enable_docvalue_scan = false`, and field has timestamp format date from _source
if (col.IsNumber()) {
// ES process date/datetime field would use millisecond timestamp for index or docvalue
// processing date type field, if a number is encountered, Doris On ES will force it to be processed according to ms
// Doris On ES needs to be consistent with ES, so just divided by 1000 because the unit for from_unixtime is seconds
return get_date_value_int<T, RT>(col, type, false, slot);
return get_date_value_int<T, RT>(col, type, false, slot, time_zone);
} else if (col.IsArray() && pure_doc_value && !col.Empty()) {
// this would happened just only when `enable_docvalue_scan = true`
// ES add default format for all field after ES 6.4, if we not provided format for `date` field ES would impose
// a standard date-format for date field as `2020-06-16T00:00:00.000Z`
// At present, we just process this string format date. After some PR were merged into Doris, we would impose `epoch_mills` for
// date field's docvalue
if (col[0].IsString()) {
return get_date_value_int<T, RT>(col[0], type, true, slot);
return get_date_value_int<T, RT>(col[0], type, true, slot, time_zone);
}
// ES would return millisecond timestamp for date field, divided by 1000 because the unit for from_unixtime is seconds
return get_date_value_int<T, RT>(col[0], type, false, slot);
return get_date_value_int<T, RT>(col[0], type, false, slot, time_zone);
} else {
// this would happened just only when `enable_docvalue_scan = false`, and field has string format date from _source
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
return get_date_value_int<T, RT>(col, type, true, slot);
return get_date_value_int<T, RT>(col, type, true, slot, time_zone);
}
}
template <typename T, typename RT>
Status fill_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value,
vectorized::IColumn* col_ptr) {
vectorized::IColumn* col_ptr, const cctz::time_zone& time_zone) {
RT data;
RETURN_IF_ERROR((get_date_int<T, RT>(col, type, pure_doc_value, &data)));
RETURN_IF_ERROR((get_date_int<T, RT>(col, type, pure_doc_value, &data, time_zone)));
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&data)), 0);
return Status::OK();
}
Expand Down Expand Up @@ -437,7 +458,8 @@ const std::string& ScrollParser::get_scroll_id() {
Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
std::vector<vectorized::MutableColumnPtr>& columns,
bool* line_eof,
const std::map<std::string, std::string>& docvalue_context) {
const std::map<std::string, std::string>& docvalue_context,
const cctz::time_zone& time_zone) {
*line_eof = true;

if (_size <= 0 || _line_index >= _size) {
Expand Down Expand Up @@ -635,16 +657,17 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
case TYPE_DATE:
case TYPE_DATETIME:
RETURN_IF_ERROR((fill_date_int<vectorized::VecDateTimeValue, int64_t>(
col, type, pure_doc_value, col_ptr)));
col, type, pure_doc_value, col_ptr, time_zone)));
break;
case TYPE_DATEV2:
RETURN_IF_ERROR(
(fill_date_int<vectorized::DateV2Value<vectorized::DateV2ValueType>, uint32_t>(
col, type, pure_doc_value, col_ptr)));
col, type, pure_doc_value, col_ptr, time_zone)));
break;
case TYPE_DATETIMEV2: {
RETURN_IF_ERROR((fill_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(col, type, pure_doc_value, col_ptr)));
RETURN_IF_ERROR(
(fill_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(col, type, pure_doc_value, col_ptr, time_zone)));
break;
}
case TYPE_ARRAY: {
Expand Down Expand Up @@ -752,15 +775,17 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
uint32_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateV2ValueType>,
uint32_t>(sub_col, sub_type, pure_doc_value, &data)));
uint32_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(sub_col, sub_type, pure_doc_value, &data)));
uint64_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
break;
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ class ScrollParser {
~ScrollParser();

Status parse(const std::string& scroll_result, bool exactly_once = false);
// Add time_zone info to convert time field of ES to local time zone of Doris
Status fill_columns(const TupleDescriptor* _tuple_desc,
std::vector<vectorized::MutableColumnPtr>& columns, bool* line_eof,
const std::map<std::string, std::string>& docvalue_context);
const std::map<std::string, std::string>& docvalue_context,
const cctz::time_zone& time_zone);

const std::string& get_scroll_id();
int get_size() const;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/new_es_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Status NewEsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& column
COUNTER_UPDATE(rows_read_counter, 1);
SCOPED_TIMER(materialize_timer);
RETURN_IF_ERROR(_es_scroll_parser->fill_columns(_tuple_desc, columns, &_line_eof,
_docvalue_context));
_docvalue_context, _state->timezone_obj()));
if (!_line_eof) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test7": 1659931810000,
"test8": "2022-08-08T12:10:10Z",
"test9": 12345,
"test10": "2022-08-08T12:10:10.151Z",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"test3": 3.14,
"test4": "2022-08-08",
"test5": 12345,
"test6": "2022-08-08T12:10:10.151Z",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test7": "2022-08-09 12:10:10",
"test8": 1660018210000,
"test9": 2222.2,
"test10": "2022-08-08T12:10:10.151+08:00",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"test3": 4,
"test4": "2022-08-08",
"test5": 2222.2,
"test6": "2022-08-08T12:10:10.151+08:00",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test7": 1660104610000,
"test8": "2022-08-10T12:10:10",
"test9": 3333.22,
"test10": "2022-08-08T12:10:10.151-04:30",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"test3": 5.0,
"test4": "2022-08-08",
"test5": "3333.22",
"test6": "2022-08-08T12:10:10.151",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test7": "2022-08-11 12:10:10",
"test8": "2022-08-11T12:10:10+09:00",
"test9": "4444.22",
"test10": "2022-08-08T12:10:10.151",
"c_bool": [true, false, true, true],
"c_byte": [1, -2, -3, 4],
"c_short": [128, 129, -129, -130],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"test5": {
"type": "long"
},
"test6": {
"type": "date"
},
"c_bool": {
"type": "boolean"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
"test5": {
"type": "long"
},
"test6": {
"type": "date"
},
"c_bool": {
"type": "boolean"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
"test9": {
"type": "long"
},
"test10": {
"type": "date"
},
"c_bool": {
"type": "boolean"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
"test9": {
"type": "long"
},
"test10": {
"type": "date"
},
"c_bool": {
"type": "boolean"
},
Expand Down
Loading