Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Opt](functions) Use preloaded cache to accelerate timezone parsing #22694

Merged
merged 4 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_TIMEZONE).empty()) {
request.__set_timezone(http_req->header(HTTP_TIMEZONE));
}
if (!http_req->header(HTTP_TIME_ZONE).empty()) {
request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
}
if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
try {
request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT)));
Expand Down
1 change: 0 additions & 1 deletion be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ static const std::string HTTP_TEMP_PARTITIONS = "temporary_partitions";
static const std::string HTTP_NEGATIVE = "negative";
static const std::string HTTP_STRICT_MODE = "strict_mode";
static const std::string HTTP_TIMEZONE = "timezone";
static const std::string HTTP_TIME_ZONE = "time_zone";
static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <algorithm>
#include <map>
#include <memory>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <vector>
Expand All @@ -30,11 +31,13 @@
#include "olap/memtable_memory_limiter.h"
#include "olap/options.h"
#include "util/threadpool.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"

namespace doris {
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
} // namespace vectorized
namespace pipeline {
class TaskScheduler;
Expand Down Expand Up @@ -183,6 +186,8 @@ class ExecEnv {
_memtable_memory_limiter.reset(limiter);
}
#endif
vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }

// only for unit test
void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; }
Expand Down Expand Up @@ -268,6 +273,9 @@ class ExecEnv {
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;

std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
std::shared_mutex _zone_cache_rw_lock;
};

template <>
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
#include "util/timezone_utils.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"

Expand Down Expand Up @@ -118,6 +119,9 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {

TimezoneUtils::load_timezone_names();

_global_zone_cache = std::make_unique<vectorized::ZoneList>();
TimezoneUtils::load_timezones_to_cache(*_global_zone_cache);

ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(config::send_batch_thread_pool_thread_num)
.set_max_threads(config::send_batch_thread_pool_thread_num)
Expand Down
177 changes: 176 additions & 1 deletion be/src/util/timezone_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

#include "util/timezone_utils.h"

#include <cctz/civil_time.h>
#include <cctz/time_zone.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <re2/stringpiece.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <boost/algorithm/string.hpp>
#include <cctype>
Expand Down Expand Up @@ -68,6 +73,175 @@ void TimezoneUtils::load_timezone_names() {
}
}
}

namespace { // functions use only in this file

template <typename T>
T swapEndianness(T value) {
constexpr int numBytes = sizeof(T);
T result = 0;
for (int i = 0; i < numBytes; ++i) {
result = (result << 8) | ((value >> (8 * i)) & 0xFF);
}
return result;
}

template <typename T>
T next_from_charstream(int8_t*& src) {
T value = *reinterpret_cast<T*>(src);
src += sizeof(T) / sizeof(int8_t);
if constexpr (std::endian::native == std::endian::little) {
return swapEndianness(
value); // timezone information files use network endianess, which is big-endian
} else if (std::endian::native == std::endian::big) {
return value;
} else {
LOG(FATAL) << "Unknown endianess";
}
__builtin_unreachable();
}

std::pair<int8_t*, int> load_file_to_memory(const std::string& path) {
int fd = open(path.c_str(), O_RDONLY);
int len = lseek(fd, 0, SEEK_END); // bytes

int8_t* addr = (int8_t*)mmap(nullptr, len, PROT_READ, MAP_PRIVATE, fd, 0);
int8_t* data = new int8_t[len];
memcpy(data, addr, len);
close(fd);
munmap(addr, len);

return {data, len};
}

struct alignas(alignof(uint8_t)) ttinfo {
uint8_t tt_utoff[4]; // need force cast to int32_t
uint8_t tt_isdst;
uint8_t tt_desigidx;
};
constexpr static int TTINFO_SIZE = sizeof(ttinfo);
static_assert(TTINFO_SIZE == 6);

struct real_ttinfo {
[[maybe_unused]] real_ttinfo() = default; // actually it's used. how stupid compiler!
real_ttinfo(const ttinfo& arg) {
diff_seconds = *reinterpret_cast<const int32_t*>(arg.tt_utoff + 0);
is_dst = arg.tt_isdst;
name_index = arg.tt_desigidx;
}

int32_t diff_seconds; // to UTC
bool is_dst;
uint8_t name_index;
};

template <>
ttinfo next_from_charstream<ttinfo>(int8_t*& src) {
ttinfo value = *reinterpret_cast<ttinfo*>(src);
src += TTINFO_SIZE;
if constexpr (std::endian::native == std::endian::little) {
std::swap(value.tt_utoff[0], value.tt_utoff[3]);
std::swap(value.tt_utoff[1], value.tt_utoff[2]);
}
return value;
}

/*
* follow the rule of tzfile(5) which defined in https://man7.org/linux/man-pages/man5/tzfile.5.html.
* should change when it changes.
*/
bool parse_load_timezone(vectorized::ZoneList& zone_list, int8_t* data, int len,
bool first_time = true) {
int8_t* begin_pos = data;
/* HEADERS */
if (memcmp(data, "TZif", 4) != 0) [[unlikely]] { // magic number
return false;
}
data += 4;

// if version = 2, the whole header&data will repeat itself one time.
int8_t version = next_from_charstream<int8_t>(data) - '0';
data += 15; // null bits
int32_t ut_count = next_from_charstream<int32_t>(data);
int32_t wall_count = next_from_charstream<int32_t>(data);
int32_t leap_count = next_from_charstream<int32_t>(data);
int32_t trans_time_count = next_from_charstream<int32_t>(data);
int32_t type_count = next_from_charstream<int32_t>(data);
int32_t char_count = next_from_charstream<int32_t>(data);

/* HEADERS end, FIELDS begin*/
// transaction time points, which we don't need
data += (first_time ? 5 : 9) * trans_time_count;

// timezones
std::vector<real_ttinfo> timezones(type_count);
for (int i = 0; i < type_count; i++) {
ttinfo tz_data = next_from_charstream<ttinfo>(data);
timezones[i] = tz_data; // cast by c'tor
}

// timezone names
const char* name_zone = (char*)data;
data += char_count;

// concate names
for (auto& tz : timezones) {
int len = strlen(name_zone + tz.name_index);
zone_list.emplace(std::string {name_zone + tz.name_index, name_zone + tz.name_index + len},
cctz::fixed_time_zone(cctz::seconds(tz.diff_seconds)));
}

// the second part.
if (version == 2 && first_time) {
// leap seconds, standard/wall indicators, UT/local indicators, which we don't need
data += 4 * leap_count + wall_count + ut_count;

return (data < begin_pos + len) &&
parse_load_timezone(zone_list, data, len - (data - begin_pos), false);
}

return true;
}

} // namespace

void TimezoneUtils::load_timezones_to_cache(vectorized::ZoneList& cache_list) {
cache_list["CST"] = cctz::fixed_time_zone(cctz::seconds(8 * 3600));

std::string base_str;
const char* tzdir = "/usr/share/zoneinfo"; // default
// try get from System
char* tzdir_env = std::getenv("TZDIR");
if (tzdir_env && *tzdir_env) {
tzdir = tzdir_env;
}

base_str += tzdir;
base_str += '/';

const auto root_path = std::filesystem::path {base_str};
std::set<std::string> ignore_paths = {"posix", "right"}; // duplications

for (std::filesystem::recursive_directory_iterator it {base_str}; it != end(it); it++) {
const auto& dir_entry = *it;
if (dir_entry.is_regular_file()) {
auto tz_name = relative(dir_entry, base_str);

auto tz_path = dir_entry.path().string();
auto [handle, length] = load_file_to_memory(tz_path);

parse_load_timezone(cache_list, handle, length);

delete[] handle;
} else if (dir_entry.is_directory() && ignore_paths.contains(dir_entry.path().filename())) {
it.disable_recursion_pending();
}
}

cache_list.erase("LMT"); // local mean time for every timezone
LOG(INFO) << "Read " << cache_list.size() << " timezones.";
}

bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, cctz::time_zone& ctz) {
auto timezone_lower = boost::algorithm::to_lower_copy(timezone);
re2::StringPiece value;
Expand Down Expand Up @@ -121,6 +295,7 @@ bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, cctz::time_
} else {
auto it = timezone_names_map_.find(timezone_lower);
if (it == timezone_names_map_.end()) {
VLOG_DEBUG << "Illegal timezone " << timezone_lower;
return false;
}
tz_parsed = cctz::load_time_zone(it->second, &ctz);
Expand Down
6 changes: 5 additions & 1 deletion be/src/util/timezone_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ class time_zone;

namespace doris {

namespace vectorized {
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
}

class TimezoneUtils {
public:
static void load_timezone_names();
static void load_timezones_to_cache(vectorized::ZoneList& cache_list);
static bool find_cctz_time_zone(const std::string& timezone, cctz::time_zone& ctz);

public:
static const std::string default_time_zone;

private:
Expand Down
Loading